RKeysReactive added. #210

pull/337/head
Nikita 9 years ago
parent e48cae2571
commit 66b31a4af0

@ -0,0 +1,213 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.command.CommandReactiveService;
import org.redisson.core.RKeysReactive;
import reactor.rx.Stream;
import reactor.rx.Streams;
import reactor.rx.subscription.ReactiveSubscription;
public class RedissonKeysReactive implements RKeysReactive {
private final CommandReactiveService commandExecutor;
public RedissonKeysReactive(CommandReactiveService commandExecutor) {
super();
this.commandExecutor = commandExecutor;
}
@Override
public Publisher<Integer> getSlot(String key) {
return commandExecutor.readObservable(null, RedisCommands.KEYSLOT, key);
}
@Override
public Publisher<String> getKeysByPattern(final String pattern) {
List<Publisher<String>> publishers = new ArrayList<Publisher<String>>();
for (ClusterSlotRange slot : commandExecutor.getConnectionManager().getEntries().keySet()) {
publishers.add(createKeysIterator(slot.getStartSlot(), pattern));
}
return Streams.merge(publishers);
}
@Override
public Publisher<String> getKeys() {
return getKeysByPattern(null);
}
private Publisher<ListScanResult<String>> scanIterator(int slot, long startPos, String pattern) {
if (pattern == null) {
return commandExecutor.writeObservable(slot, StringCodec.INSTANCE, RedisCommands.SCAN, startPos);
}
return commandExecutor.writeObservable(slot, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "MATCH", pattern);
}
private Publisher<String> createKeysIterator(final int slot, final String pattern) {
return new Stream<String>() {
@Override
public void subscribe(final Subscriber<? super String> t) {
t.onSubscribe(new ReactiveSubscription<String>(this, t) {
private List<String> firstValues;
private long nextIterPos;
private InetSocketAddress client;
private long currentIndex;
@Override
protected void onRequest(final long n) {
currentIndex = n;
nextValues();
}
protected void nextValues() {
final ReactiveSubscription<String> m = this;
scanIterator(slot, nextIterPos, pattern).subscribe(new Subscriber<ListScanResult<String>>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(ListScanResult<String> res) {
client = res.getRedisClient();
long prevIterPos = nextIterPos;
if (nextIterPos == 0 && firstValues == null) {
firstValues = res.getValues();
} else if (res.getValues().equals(firstValues)) {
m.onComplete();
currentIndex = 0;
return;
}
nextIterPos = res.getPos();
if (prevIterPos == nextIterPos) {
nextIterPos = -1;
}
for (String val : res.getValues()) {
m.onNext(val);
currentIndex--;
if (currentIndex == 0) {
m.onComplete();
return;
}
}
if (nextIterPos == -1) {
m.onComplete();
currentIndex = 0;
}
}
@Override
public void onError(Throwable error) {
m.onError(error);
}
@Override
public void onComplete() {
if (currentIndex == 0) {
return;
}
nextValues();
}
});
}
});
}
};
}
@Override
public Publisher<String> randomKey() {
return commandExecutor.readRandomObservable(RedisCommands.RANDOM_KEY);
}
/**
* Delete multiple objects by a key pattern
*
* Supported glob-style patterns:
* h?llo subscribes to hello, hallo and hxllo
* h*llo subscribes to hllo and heeeello
* h[ae]llo subscribes to hello and hallo, but not hillo
*
* @param pattern
* @return
*/
@Override
public Publisher<Long> deleteByPattern(String pattern) {
return commandExecutor.evalWriteAllObservable(RedisCommands.EVAL_LONG, new SlotCallback<Long, Long>() {
AtomicLong results = new AtomicLong();
@Override
public void onSlotResult(Long result) {
results.addAndGet(result);
}
@Override
public Long onFinish() {
return results.get();
}
}, "local keys = redis.call('keys', ARGV[1]) "
+ "local n = 0 "
+ "for i=1, table.getn(keys),5000 do "
+ "n = n + redis.call('del', unpack(keys, i, math.min(i+4999, table.getn(keys)))) "
+ "end "
+ "return n;",Collections.emptyList(), pattern);
}
/**
* Delete multiple objects by name
*
* @param keys - object names
* @return
*/
@Override
public Publisher<Long> delete(String ... keys) {
return commandExecutor.writeAllObservable(RedisCommands.DEL, new SlotCallback<Long, Long>() {
AtomicLong results = new AtomicLong();
@Override
public void onSlotResult(Long result) {
results.addAndGet(result);
}
@Override
public Long onFinish() {
return results.get();
}
}, (Object[])keys);
}
}

@ -36,6 +36,7 @@ import org.redisson.core.RBlockingQueueReactive;
import org.redisson.core.RBucketReactive; import org.redisson.core.RBucketReactive;
import org.redisson.core.RDequeReactive; import org.redisson.core.RDequeReactive;
import org.redisson.core.RHyperLogLogReactive; import org.redisson.core.RHyperLogLogReactive;
import org.redisson.core.RKeysReactive;
import org.redisson.core.RLexSortedSetReactive; import org.redisson.core.RLexSortedSetReactive;
import org.redisson.core.RListReactive; import org.redisson.core.RListReactive;
import org.redisson.core.RMapReactive; import org.redisson.core.RMapReactive;
@ -238,6 +239,11 @@ public class RedissonReactive implements RedissonReactiveClient {
return new RedissonBatchReactive(connectionManager); return new RedissonBatchReactive(connectionManager);
} }
@Override
public RKeysReactive getKeys() {
return new RedissonKeysReactive(commandExecutor);
}
public Config getConfig() { public Config getConfig() {
return config; return config;
} }

@ -25,6 +25,7 @@ import org.redisson.core.RBlockingQueueReactive;
import org.redisson.core.RBucketReactive; import org.redisson.core.RBucketReactive;
import org.redisson.core.RDequeReactive; import org.redisson.core.RDequeReactive;
import org.redisson.core.RHyperLogLogReactive; import org.redisson.core.RHyperLogLogReactive;
import org.redisson.core.RKeysReactive;
import org.redisson.core.RLexSortedSet; import org.redisson.core.RLexSortedSet;
import org.redisson.core.RLexSortedSetReactive; import org.redisson.core.RLexSortedSetReactive;
import org.redisson.core.RListReactive; import org.redisson.core.RListReactive;
@ -196,13 +197,13 @@ public interface RedissonReactiveClient {
*/ */
RBatchReactive createBatch(); RBatchReactive createBatch();
// /** /**
// * Returns keys operations. * Returns keys operations.
// * Each of Redis/Redisson object associated with own key * Each of Redis/Redisson object associated with own key
// * *
// * @return * @return
// */ */
// RKeys getKeys(); RKeysReactive getKeys();
/** /**
* Shuts down Redisson instance <b>NOT</b> Redis server * Shuts down Redisson instance <b>NOT</b> Redis server

@ -131,7 +131,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
} }
@Override @Override
public <T, R> Future<R> readRandomAsync(final RedisCommand<T> command, final Object ... params) { public <T, R> Future<R> readRandomAsync(RedisCommand<T> command, Object ... params) {
final Promise<R> mainPromise = connectionManager.newPromise(); final Promise<R> mainPromise = connectionManager.newPromise();
final List<ClusterSlotRange> slots = new ArrayList<ClusterSlotRange>(connectionManager.getEntries().keySet()); final List<ClusterSlotRange> slots = new ArrayList<ClusterSlotRange>(connectionManager.getEntries().keySet());
Collections.shuffle(slots); Collections.shuffle(slots);

@ -16,6 +16,7 @@
package org.redisson.command; package org.redisson.command;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.List; import java.util.List;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
@ -33,6 +34,14 @@ public interface CommandReactiveExecutor {
ConnectionManager getConnectionManager(); ConnectionManager getConnectionManager();
<T, R> Publisher<R> evalWriteAllObservable(RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params);
<T, R> Publisher<Collection<R>> readAllObservable(RedisCommand<T> command, Object ... params);
<T, R> Publisher<R> readRandomObservable(RedisCommand<T> command, Object ... params);
<T, R> Publisher<R> writeObservable(Integer slot, Codec codec, RedisCommand<T> command, Object ... params);
<T> Publisher<Void> writeAllObservable(RedisCommand<T> command, Object ... params); <T> Publisher<Void> writeAllObservable(RedisCommand<T> command, Object ... params);
<R, T> Publisher<R> writeAllObservable(RedisCommand<T> command, SlotCallback<T, R> callback, Object ... params); <R, T> Publisher<R> writeAllObservable(RedisCommand<T> command, SlotCallback<T, R> callback, Object ... params);

@ -16,6 +16,7 @@
package org.redisson.command; package org.redisson.command;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.List; import java.util.List;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
@ -38,6 +39,24 @@ public class CommandReactiveService extends CommandAsyncService implements Comma
super(connectionManager); super(connectionManager);
} }
@Override
public <T, R> Publisher<R> evalWriteAllObservable(RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params) {
Future<R> f = evalWriteAllAsync(command, callback, script, keys, params);
return new NettyFuturePublisher<R>(f);
}
@Override
public <T, R> Publisher<Collection<R>> readAllObservable(RedisCommand<T> command, Object ... params) {
Future<Collection<R>> f = readAllAsync(command, params);
return new NettyFuturePublisher<Collection<R>>(f);
}
@Override
public <T, R> Publisher<R> readRandomObservable(RedisCommand<T> command, Object ... params) {
Future<R> f = readRandomAsync(command, params);
return new NettyFuturePublisher<R>(f);
}
@Override @Override
public <T, R> Publisher<R> readObservable(InetSocketAddress client, String key, Codec codec, RedisCommand<T> command, Object ... params) { public <T, R> Publisher<R> readObservable(InetSocketAddress client, String key, Codec codec, RedisCommand<T> command, Object ... params) {
Future<R> f = readAsync(client, key, codec, command, params); Future<R> f = readAsync(client, key, codec, command, params);
@ -55,6 +74,11 @@ public class CommandReactiveService extends CommandAsyncService implements Comma
return new NettyFuturePublisher<R>(f); return new NettyFuturePublisher<R>(f);
} }
public <T, R> Publisher<R> writeObservable(Integer slot, Codec codec, RedisCommand<T> command, Object ... params) {
Future<R> f = writeAsync(slot, codec, command, params);
return new NettyFuturePublisher<R>(f);
}
@Override @Override
public <T, R> Publisher<R> readObservable(String key, RedisCommand<T> command, Object ... params) { public <T, R> Publisher<R> readObservable(String key, RedisCommand<T> command, Object ... params) {
return readObservable(key, connectionManager.getCodec(), command, params); return readObservable(key, connectionManager.getCodec(), command, params);

@ -0,0 +1,63 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import org.reactivestreams.Publisher;
public interface RKeysReactive {
Publisher<String> getKeys();
Publisher<String> getKeysByPattern(String pattern);
/**
* Get hash slot identifier for key in mode.
* Available for cluster nodes only
*
* @param key
* @return
*/
Publisher<Integer> getSlot(String key);
/**
* Get random key in mode
*
* @return
*/
Publisher<String> randomKey();
/**
* Delete multiple objects by a key pattern in mode
*
* Supported glob-style patterns:
* h?llo subscribes to hello, hallo and hxllo
* h*llo subscribes to hllo and heeeello
* h[ae]llo subscribes to hello and hallo, but not hillo
*
* @param pattern
* @return
*/
Publisher<Long> deleteByPattern(String pattern);
/**
* Delete multiple objects by name in mode
*
* @param keys - object names
* @return
*/
Publisher<Long> delete(String ... keys);
}

@ -0,0 +1,64 @@
package org.redisson;
import java.util.Iterator;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.core.RBucketReactive;
import org.redisson.core.RMapReactive;
public class RedissonKeysReactiveTest extends BaseReactiveTest {
@Test
public void testKeysIterablePattern() {
redisson.getBucket("test1").set("someValue");
redisson.getBucket("test2").set("someValue");
redisson.getBucket("test12").set("someValue");
Iterator<String> iterator = toIterator(redisson.getKeys().getKeysByPattern("test?"));
for (; iterator.hasNext();) {
String key = iterator.next();
MatcherAssert.assertThat(key, Matchers.isOneOf("test1", "test2"));
}
}
@Test
public void testRandomKey() {
RBucketReactive<String> bucket = redisson.getBucket("test1");
sync(bucket.set("someValue1"));
RBucketReactive<String> bucket2 = redisson.getBucket("test2");
sync(bucket2.set("someValue2"));
MatcherAssert.assertThat(sync(redisson.getKeys().randomKey()), Matchers.isOneOf("test1", "test2"));
sync(redisson.getKeys().delete("test1"));
Assert.assertEquals(sync(redisson.getKeys().randomKey()), "test2");
redisson.flushdb();
Assert.assertNull(sync(redisson.getKeys().randomKey()));
}
@Test
public void testDeleteByPattern() {
RBucketReactive<String> bucket = redisson.getBucket("test1");
sync(bucket.set("someValue"));
RMapReactive<String, String> map = redisson.getMap("test2");
sync(map.fastPut("1", "2"));
Assert.assertEquals(2, sync(redisson.getKeys().deleteByPattern("test?")).intValue());
}
@Test
public void testMassDelete() {
RBucketReactive<String> bucket = redisson.getBucket("test");
sync(bucket.set("someValue"));
RMapReactive<String, String> map = redisson.getMap("map2");
sync(map.fastPut("1", "2"));
Assert.assertEquals(2, sync(redisson.getKeys().delete("test", "map2")).intValue());
Assert.assertEquals(0, sync(redisson.getKeys().delete("test", "map2")).intValue());
}
}
Loading…
Cancel
Save