diff --git a/src/main/java/org/redisson/RedissonKeysReactive.java b/src/main/java/org/redisson/RedissonKeysReactive.java new file mode 100644 index 000000000..56833c86d --- /dev/null +++ b/src/main/java/org/redisson/RedissonKeysReactive.java @@ -0,0 +1,213 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.redisson.client.codec.StringCodec; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.client.protocol.decoder.ListScanResult; +import org.redisson.cluster.ClusterSlotRange; +import org.redisson.command.CommandReactiveService; +import org.redisson.core.RKeysReactive; + +import reactor.rx.Stream; +import reactor.rx.Streams; +import reactor.rx.subscription.ReactiveSubscription; + +public class RedissonKeysReactive implements RKeysReactive { + + private final CommandReactiveService commandExecutor; + + public RedissonKeysReactive(CommandReactiveService commandExecutor) { + super(); + this.commandExecutor = commandExecutor; + } + + @Override + public Publisher getSlot(String key) { + return commandExecutor.readObservable(null, RedisCommands.KEYSLOT, key); + } + + @Override + public Publisher getKeysByPattern(final String pattern) { + List> publishers = new ArrayList>(); + for (ClusterSlotRange slot : commandExecutor.getConnectionManager().getEntries().keySet()) { + publishers.add(createKeysIterator(slot.getStartSlot(), pattern)); + } + return Streams.merge(publishers); + } + + @Override + public Publisher getKeys() { + return getKeysByPattern(null); + } + + private Publisher> scanIterator(int slot, long startPos, String pattern) { + if (pattern == null) { + return commandExecutor.writeObservable(slot, StringCodec.INSTANCE, RedisCommands.SCAN, startPos); + } + return commandExecutor.writeObservable(slot, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "MATCH", pattern); + } + + private Publisher createKeysIterator(final int slot, final String pattern) { + return new Stream() { + + @Override + public void subscribe(final Subscriber t) { + t.onSubscribe(new ReactiveSubscription(this, t) { + + private List firstValues; + private long nextIterPos; + private InetSocketAddress client; + + private long currentIndex; + + @Override + protected void onRequest(final long n) { + currentIndex = n; + nextValues(); + } + + protected void nextValues() { + final ReactiveSubscription m = this; + scanIterator(slot, nextIterPos, pattern).subscribe(new Subscriber>() { + + @Override + public void onSubscribe(Subscription s) { + s.request(Long.MAX_VALUE); + } + + @Override + public void onNext(ListScanResult res) { + client = res.getRedisClient(); + + long prevIterPos = nextIterPos; + if (nextIterPos == 0 && firstValues == null) { + firstValues = res.getValues(); + } else if (res.getValues().equals(firstValues)) { + m.onComplete(); + currentIndex = 0; + return; + } + + nextIterPos = res.getPos(); + if (prevIterPos == nextIterPos) { + nextIterPos = -1; + } + for (String val : res.getValues()) { + m.onNext(val); + currentIndex--; + if (currentIndex == 0) { + m.onComplete(); + return; + } + } + if (nextIterPos == -1) { + m.onComplete(); + currentIndex = 0; + } + } + + @Override + public void onError(Throwable error) { + m.onError(error); + } + + @Override + public void onComplete() { + if (currentIndex == 0) { + return; + } + nextValues(); + } + }); + } + }); + } + + }; + } + + @Override + public Publisher randomKey() { + return commandExecutor.readRandomObservable(RedisCommands.RANDOM_KEY); + } + + /** + * Delete multiple objects by a key pattern + * + * Supported glob-style patterns: + * h?llo subscribes to hello, hallo and hxllo + * h*llo subscribes to hllo and heeeello + * h[ae]llo subscribes to hello and hallo, but not hillo + * + * @param pattern + * @return + */ + @Override + public Publisher deleteByPattern(String pattern) { + return commandExecutor.evalWriteAllObservable(RedisCommands.EVAL_LONG, new SlotCallback() { + AtomicLong results = new AtomicLong(); + @Override + public void onSlotResult(Long result) { + results.addAndGet(result); + } + + @Override + public Long onFinish() { + return results.get(); + } + }, "local keys = redis.call('keys', ARGV[1]) " + + "local n = 0 " + + "for i=1, table.getn(keys),5000 do " + + "n = n + redis.call('del', unpack(keys, i, math.min(i+4999, table.getn(keys)))) " + + "end " + + "return n;",Collections.emptyList(), pattern); + } + + /** + * Delete multiple objects by name + * + * @param keys - object names + * @return + */ + @Override + public Publisher delete(String ... keys) { + return commandExecutor.writeAllObservable(RedisCommands.DEL, new SlotCallback() { + AtomicLong results = new AtomicLong(); + @Override + public void onSlotResult(Long result) { + results.addAndGet(result); + } + + @Override + public Long onFinish() { + return results.get(); + } + }, (Object[])keys); + } + + +} diff --git a/src/main/java/org/redisson/RedissonReactive.java b/src/main/java/org/redisson/RedissonReactive.java index 24d4fc9ae..8a12c4127 100644 --- a/src/main/java/org/redisson/RedissonReactive.java +++ b/src/main/java/org/redisson/RedissonReactive.java @@ -36,6 +36,7 @@ import org.redisson.core.RBlockingQueueReactive; import org.redisson.core.RBucketReactive; import org.redisson.core.RDequeReactive; import org.redisson.core.RHyperLogLogReactive; +import org.redisson.core.RKeysReactive; import org.redisson.core.RLexSortedSetReactive; import org.redisson.core.RListReactive; import org.redisson.core.RMapReactive; @@ -238,6 +239,11 @@ public class RedissonReactive implements RedissonReactiveClient { return new RedissonBatchReactive(connectionManager); } + @Override + public RKeysReactive getKeys() { + return new RedissonKeysReactive(commandExecutor); + } + public Config getConfig() { return config; } diff --git a/src/main/java/org/redisson/RedissonReactiveClient.java b/src/main/java/org/redisson/RedissonReactiveClient.java index 3af9b9118..f4c4e416c 100644 --- a/src/main/java/org/redisson/RedissonReactiveClient.java +++ b/src/main/java/org/redisson/RedissonReactiveClient.java @@ -25,6 +25,7 @@ import org.redisson.core.RBlockingQueueReactive; import org.redisson.core.RBucketReactive; import org.redisson.core.RDequeReactive; import org.redisson.core.RHyperLogLogReactive; +import org.redisson.core.RKeysReactive; import org.redisson.core.RLexSortedSet; import org.redisson.core.RLexSortedSetReactive; import org.redisson.core.RListReactive; @@ -196,13 +197,13 @@ public interface RedissonReactiveClient { */ RBatchReactive createBatch(); -// /** -// * Returns keys operations. -// * Each of Redis/Redisson object associated with own key -// * -// * @return -// */ -// RKeys getKeys(); + /** + * Returns keys operations. + * Each of Redis/Redisson object associated with own key + * + * @return + */ + RKeysReactive getKeys(); /** * Shuts down Redisson instance NOT Redis server diff --git a/src/main/java/org/redisson/command/CommandAsyncService.java b/src/main/java/org/redisson/command/CommandAsyncService.java index df9f815a4..a326762df 100644 --- a/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/src/main/java/org/redisson/command/CommandAsyncService.java @@ -131,7 +131,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { } @Override - public Future readRandomAsync(final RedisCommand command, final Object ... params) { + public Future readRandomAsync(RedisCommand command, Object ... params) { final Promise mainPromise = connectionManager.newPromise(); final List slots = new ArrayList(connectionManager.getEntries().keySet()); Collections.shuffle(slots); diff --git a/src/main/java/org/redisson/command/CommandReactiveExecutor.java b/src/main/java/org/redisson/command/CommandReactiveExecutor.java index 5534f63e5..f2d509525 100644 --- a/src/main/java/org/redisson/command/CommandReactiveExecutor.java +++ b/src/main/java/org/redisson/command/CommandReactiveExecutor.java @@ -16,6 +16,7 @@ package org.redisson.command; import java.net.InetSocketAddress; +import java.util.Collection; import java.util.List; import org.reactivestreams.Publisher; @@ -33,6 +34,14 @@ public interface CommandReactiveExecutor { ConnectionManager getConnectionManager(); + Publisher evalWriteAllObservable(RedisCommand command, SlotCallback callback, String script, List keys, Object ... params); + + Publisher> readAllObservable(RedisCommand command, Object ... params); + + Publisher readRandomObservable(RedisCommand command, Object ... params); + + Publisher writeObservable(Integer slot, Codec codec, RedisCommand command, Object ... params); + Publisher writeAllObservable(RedisCommand command, Object ... params); Publisher writeAllObservable(RedisCommand command, SlotCallback callback, Object ... params); diff --git a/src/main/java/org/redisson/command/CommandReactiveService.java b/src/main/java/org/redisson/command/CommandReactiveService.java index 03d6c9f95..941f59ea2 100644 --- a/src/main/java/org/redisson/command/CommandReactiveService.java +++ b/src/main/java/org/redisson/command/CommandReactiveService.java @@ -16,6 +16,7 @@ package org.redisson.command; import java.net.InetSocketAddress; +import java.util.Collection; import java.util.List; import org.reactivestreams.Publisher; @@ -38,6 +39,24 @@ public class CommandReactiveService extends CommandAsyncService implements Comma super(connectionManager); } + @Override + public Publisher evalWriteAllObservable(RedisCommand command, SlotCallback callback, String script, List keys, Object ... params) { + Future f = evalWriteAllAsync(command, callback, script, keys, params); + return new NettyFuturePublisher(f); + } + + @Override + public Publisher> readAllObservable(RedisCommand command, Object ... params) { + Future> f = readAllAsync(command, params); + return new NettyFuturePublisher>(f); + } + + @Override + public Publisher readRandomObservable(RedisCommand command, Object ... params) { + Future f = readRandomAsync(command, params); + return new NettyFuturePublisher(f); + } + @Override public Publisher readObservable(InetSocketAddress client, String key, Codec codec, RedisCommand command, Object ... params) { Future f = readAsync(client, key, codec, command, params); @@ -55,6 +74,11 @@ public class CommandReactiveService extends CommandAsyncService implements Comma return new NettyFuturePublisher(f); } + public Publisher writeObservable(Integer slot, Codec codec, RedisCommand command, Object ... params) { + Future f = writeAsync(slot, codec, command, params); + return new NettyFuturePublisher(f); + } + @Override public Publisher readObservable(String key, RedisCommand command, Object ... params) { return readObservable(key, connectionManager.getCodec(), command, params); diff --git a/src/main/java/org/redisson/core/RKeysReactive.java b/src/main/java/org/redisson/core/RKeysReactive.java new file mode 100644 index 000000000..60ef15221 --- /dev/null +++ b/src/main/java/org/redisson/core/RKeysReactive.java @@ -0,0 +1,63 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +import org.reactivestreams.Publisher; + +public interface RKeysReactive { + + Publisher getKeys(); + + Publisher getKeysByPattern(String pattern); + + /** + * Get hash slot identifier for key in mode. + * Available for cluster nodes only + * + * @param key + * @return + */ + Publisher getSlot(String key); + + /** + * Get random key in mode + * + * @return + */ + Publisher randomKey(); + + /** + * Delete multiple objects by a key pattern in mode + * + * Supported glob-style patterns: + * h?llo subscribes to hello, hallo and hxllo + * h*llo subscribes to hllo and heeeello + * h[ae]llo subscribes to hello and hallo, but not hillo + * + * @param pattern + * @return + */ + Publisher deleteByPattern(String pattern); + + /** + * Delete multiple objects by name in mode + * + * @param keys - object names + * @return + */ + Publisher delete(String ... keys); + +} diff --git a/src/test/java/org/redisson/RedissonKeysReactiveTest.java b/src/test/java/org/redisson/RedissonKeysReactiveTest.java new file mode 100644 index 000000000..796442992 --- /dev/null +++ b/src/test/java/org/redisson/RedissonKeysReactiveTest.java @@ -0,0 +1,64 @@ +package org.redisson; + +import java.util.Iterator; + +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; +import org.junit.Assert; +import org.junit.Test; +import org.redisson.core.RBucketReactive; +import org.redisson.core.RMapReactive; + +public class RedissonKeysReactiveTest extends BaseReactiveTest { + + @Test + public void testKeysIterablePattern() { + redisson.getBucket("test1").set("someValue"); + redisson.getBucket("test2").set("someValue"); + + redisson.getBucket("test12").set("someValue"); + + Iterator iterator = toIterator(redisson.getKeys().getKeysByPattern("test?")); + for (; iterator.hasNext();) { + String key = iterator.next(); + MatcherAssert.assertThat(key, Matchers.isOneOf("test1", "test2")); + } + } + + @Test + public void testRandomKey() { + RBucketReactive bucket = redisson.getBucket("test1"); + sync(bucket.set("someValue1")); + + RBucketReactive bucket2 = redisson.getBucket("test2"); + sync(bucket2.set("someValue2")); + + MatcherAssert.assertThat(sync(redisson.getKeys().randomKey()), Matchers.isOneOf("test1", "test2")); + sync(redisson.getKeys().delete("test1")); + Assert.assertEquals(sync(redisson.getKeys().randomKey()), "test2"); + redisson.flushdb(); + Assert.assertNull(sync(redisson.getKeys().randomKey())); + } + + @Test + public void testDeleteByPattern() { + RBucketReactive bucket = redisson.getBucket("test1"); + sync(bucket.set("someValue")); + RMapReactive map = redisson.getMap("test2"); + sync(map.fastPut("1", "2")); + + Assert.assertEquals(2, sync(redisson.getKeys().deleteByPattern("test?")).intValue()); + } + + @Test + public void testMassDelete() { + RBucketReactive bucket = redisson.getBucket("test"); + sync(bucket.set("someValue")); + RMapReactive map = redisson.getMap("map2"); + sync(map.fastPut("1", "2")); + + Assert.assertEquals(2, sync(redisson.getKeys().delete("test", "map2")).intValue()); + Assert.assertEquals(0, sync(redisson.getKeys().delete("test", "map2")).intValue()); + } + +}