From e89cce3b3ec85aa42054dd98b063c11dd22e9aae Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 19 Dec 2018 13:03:09 +0300 Subject: [PATCH 1/3] refactoring # Conflicts: # redisson/src/main/java/org/redisson/reactive/MapReactiveIterator.java # redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java # redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java # redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java # redisson/src/main/java/org/redisson/reactive/SetReactiveIterator.java --- .../redisson/client/protocol/RedisCommands.java | 2 +- .../redisson/command/CommandAsyncExecutor.java | 8 ++++++-- .../org/redisson/command/CommandAsyncService.java | 15 +++++++++++---- .../redisson/pubsub/PubSubConnectionEntry.java | 2 +- ...tiveIterator.java => MapReactiveIterator.java} | 11 ++++++++--- .../redisson/reactive/RedissonKeysReactive.java | 4 ++-- .../reactive/RedissonMapCacheReactive.java | 6 +++--- .../redisson/reactive/RedissonMapReactive.java | 6 +++--- 8 files changed, 35 insertions(+), 19 deletions(-) rename redisson/src/main/java/org/redisson/reactive/{RedissonMapReactiveIterator.java => MapReactiveIterator.java} (90%) diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index 26a724c1a..825c67a10 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -146,7 +146,7 @@ public interface RedisCommands { RedisCommand> ZRANGEBYSCORE = new RedisCommand>("ZRANGEBYSCORE", new ObjectSetReplayDecoder()); RedisCommand> ZRANGEBYSCORE_LIST = new RedisCommand>("ZRANGEBYSCORE", new ObjectListReplayDecoder()); RedisCommand> ZREVRANGE = new RedisCommand>("ZREVRANGE", new ObjectListReplayDecoder()); - RedisCommand> ZREVRANGEBYSCORE = new RedisCommand>("ZREVRANGEBYSCORE", new ObjectListReplayDecoder()); + RedisCommand> ZREVRANGEBYSCORE = new RedisCommand>("ZREVRANGEBYSCORE", new ObjectSetReplayDecoder()); RedisCommand>> ZREVRANGE_ENTRY = new RedisCommand>>("ZREVRANGE", new ScoredSortedSetReplayDecoder()); RedisCommand>> ZREVRANGEBYSCORE_ENTRY = new RedisCommand>>("ZREVRANGEBYSCORE", new ScoredSortedSetReplayDecoder()); RedisCommand>> ZRANGE_ENTRY = new RedisCommand>>("ZRANGE", new ScoredSortedSetReplayDecoder()); diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java b/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java index 05cd1552a..2121e3da2 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java @@ -58,6 +58,8 @@ public interface CommandAsyncExecutor { RFuture writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand command, Object ... params); + RFuture writeAsync(byte[] key, Codec codec, RedisCommand command, Object... params); + RFuture readAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand command, Object ... params); RFuture readAsync(RedisClient client, String name, Codec codec, RedisCommand command, Object ... params); @@ -70,8 +72,12 @@ public interface CommandAsyncExecutor { RFuture writeAllAsync(RedisCommand command, SlotCallback callback, Object ... params); + RFuture> readAllAsync(Codec codec, RedisCommand command, Object... params); + RFuture readAllAsync(RedisCommand command, SlotCallback callback, Object ... params); + RFuture> readAllAsync(Collection results, Codec codec, RedisCommand command, Object... params); + RFuture evalReadAsync(RedisClient client, String name, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params); RFuture evalReadAsync(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params); @@ -90,8 +96,6 @@ public interface CommandAsyncExecutor { RFuture> readAllAsync(RedisCommand command, Object ... params); - RFuture> readAllAsync(Collection results, RedisCommand command, Object ... params); - RFuture writeAllAsync(Codec codec, RedisCommand command, SlotCallback callback, Object... params); RFuture writeAllAsync(RedisCommand command, Object ... params); diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 93b3235cb..cabfd1ec7 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -245,13 +245,20 @@ public class CommandAsyncService implements CommandAsyncExecutor { return mainPromise; } + @Override + public RFuture> readAllAsync(Codec codec, RedisCommand command, Object... params) { + List results = new ArrayList(); + return readAllAsync(results, codec, command, params); + } + + @Override public RFuture> readAllAsync(RedisCommand command, Object... params) { List results = new ArrayList(); - return readAllAsync(results, command, params); + return readAllAsync(results, connectionManager.getCodec(), command, params); } @Override - public RFuture> readAllAsync(final Collection results, RedisCommand command, Object... params) { + public RFuture> readAllAsync(final Collection results, Codec codec, RedisCommand command, Object... params) { final RPromise> mainPromise = createPromise(); final Collection nodes = connectionManager.getEntrySet(); final AtomicInteger counter = new AtomicInteger(nodes.size()); @@ -284,11 +291,11 @@ public class CommandAsyncService implements CommandAsyncExecutor { for (MasterSlaveEntry entry : nodes) { RPromise promise = new RedissonPromise(); promise.addListener(listener); - async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0, true, null); + async(true, new NodeSource(entry), codec, command, params, promise, 0, true, null); } return mainPromise; } - + @Override public RFuture readRandomAsync(Codec codec, RedisCommand command, Object... params) { RPromise mainPromise = createPromise(); diff --git a/redisson/src/main/java/org/redisson/pubsub/PubSubConnectionEntry.java b/redisson/src/main/java/org/redisson/pubsub/PubSubConnectionEntry.java index 5702a0445..36bcafb50 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PubSubConnectionEntry.java +++ b/redisson/src/main/java/org/redisson/pubsub/PubSubConnectionEntry.java @@ -138,7 +138,7 @@ public class PubSubConnectionEntry { return false; } - private void removeListener(ChannelName channelName, RedisPubSubListener listener) { + public void removeListener(ChannelName channelName, RedisPubSubListener listener) { Queue> queue = channelListeners.get(channelName); synchronized (queue) { if (queue.remove(listener) && queue.isEmpty()) { diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactiveIterator.java b/redisson/src/main/java/org/redisson/reactive/MapReactiveIterator.java similarity index 90% rename from redisson/src/main/java/org/redisson/reactive/RedissonMapReactiveIterator.java rename to redisson/src/main/java/org/redisson/reactive/MapReactiveIterator.java index be2fc4a32..0f75dba33 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactiveIterator.java +++ b/redisson/src/main/java/org/redisson/reactive/MapReactiveIterator.java @@ -21,6 +21,7 @@ import java.util.Map.Entry; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.redisson.RedissonMap; +import org.redisson.api.RFuture; import org.redisson.client.RedisClient; import org.redisson.client.protocol.decoder.MapScanResult; @@ -37,13 +38,13 @@ import reactor.rx.subscription.ReactiveSubscription; * @param value type * @param entry type */ -public class RedissonMapReactiveIterator { +public class MapReactiveIterator { private final RedissonMap map; private final String pattern; private final int count; - public RedissonMapReactiveIterator(RedissonMap map, String pattern, int count) { + public MapReactiveIterator(RedissonMap map, String pattern, int count) { this.map = map; this.pattern = pattern; this.count = count; @@ -69,7 +70,7 @@ public class RedissonMapReactiveIterator { protected void nextValues() { final ReactiveSubscription m = this; - map.scanIteratorAsync(map.getName(), client, nextIterPos, pattern, count).addListener(new FutureListener>() { + scanIterator(client, nextIterPos).addListener(new FutureListener>() { @Override public void operationComplete(Future> future) @@ -129,4 +130,8 @@ public class RedissonMapReactiveIterator { }; } + public RFuture> scanIterator(RedisClient client, long nextIterPos) { + return map.scanIteratorAsync(map.getName(), client, nextIterPos, pattern, count); + } + } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java index fd104c204..6473b8a6c 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java @@ -38,11 +38,11 @@ import reactor.rx.subscription.ReactiveSubscription; */ public class RedissonKeysReactive { - private final CommandReactiveService commandExecutor; + private final CommandReactiveExecutor commandExecutor; private final RedissonKeys instance; - public RedissonKeysReactive(CommandReactiveService commandExecutor) { + public RedissonKeysReactive(CommandReactiveExecutor commandExecutor) { super(); instance = new RedissonKeys(commandExecutor); this.commandExecutor = commandExecutor; diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java index 45bd226f4..85a6319f8 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java @@ -50,7 +50,7 @@ public class RedissonMapCacheReactive { } public Publisher> entryIterator(String pattern, int count) { - return new RedissonMapReactiveIterator>((RedissonMap) mapCache, pattern, count).stream(); + return new MapReactiveIterator>((RedissonMap) mapCache, pattern, count).stream(); } public Publisher valueIterator() { @@ -66,7 +66,7 @@ public class RedissonMapCacheReactive { } public Publisher valueIterator(String pattern, int count) { - return new RedissonMapReactiveIterator((RedissonMap) mapCache, pattern, count) { + return new MapReactiveIterator((RedissonMap) mapCache, pattern, count) { @Override V getValue(Entry entry) { return (V) entry.getValue(); @@ -87,7 +87,7 @@ public class RedissonMapCacheReactive { } public Publisher keyIterator(String pattern, int count) { - return new RedissonMapReactiveIterator((RedissonMap) mapCache, pattern, count) { + return new MapReactiveIterator((RedissonMap) mapCache, pattern, count) { @Override K getValue(Entry entry) { return (K) entry.getKey(); diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java index de6897314..fe8847dc6 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java @@ -60,7 +60,7 @@ public class RedissonMapReactive { } public Publisher> entryIterator(String pattern, int count) { - return new RedissonMapReactiveIterator>((RedissonMap) instance, pattern, count).stream(); + return new MapReactiveIterator>((RedissonMap) instance, pattern, count).stream(); } public Publisher valueIterator() { @@ -76,7 +76,7 @@ public class RedissonMapReactive { } public Publisher valueIterator(String pattern, int count) { - return new RedissonMapReactiveIterator((RedissonMap) instance, pattern, count) { + return new MapReactiveIterator((RedissonMap) instance, pattern, count) { @Override V getValue(Entry entry) { return (V) entry.getValue(); @@ -97,7 +97,7 @@ public class RedissonMapReactive { } public Publisher keyIterator(String pattern, int count) { - return new RedissonMapReactiveIterator((RedissonMap) instance, pattern, count) { + return new MapReactiveIterator((RedissonMap) instance, pattern, count) { @Override K getValue(Entry entry) { return (K) entry.getKey(); From b0afc60755a122f4efe6a47abfbe5bfd9efacfef Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 19 Dec 2018 13:20:24 +0300 Subject: [PATCH 2/3] refactoring --- .../org/redisson/spring/data/connection/RedissonConnection.java | 2 +- .../org/redisson/spring/data/connection/RedissonConnection.java | 2 +- .../org/redisson/spring/data/connection/RedissonConnection.java | 2 +- .../org/redisson/spring/data/connection/RedissonConnection.java | 2 +- redisson/src/main/java/org/redisson/RedissonMapCache.java | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/redisson-spring-data/redisson-spring-data-16/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java b/redisson-spring-data/redisson-spring-data-16/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java index b7a092960..e29dd63a5 100644 --- a/redisson-spring-data/redisson-spring-data-16/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java +++ b/redisson-spring-data/redisson-spring-data-16/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java @@ -324,7 +324,7 @@ public class RedissonConnection extends AbstractRedisConnection { } Set results = new HashSet(); - RFuture> f = (RFuture>)(Object)(executorService.readAllAsync(results, KEYS, pattern)); + RFuture> f = (RFuture>)(Object)(executorService.readAllAsync(results, StringCodec.INSTANCE, KEYS, pattern)); return sync(f); } diff --git a/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java b/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java index b771e4cdb..605cde25e 100644 --- a/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java +++ b/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java @@ -326,7 +326,7 @@ public class RedissonConnection extends AbstractRedisConnection { } Set results = new HashSet(); - RFuture> f = (RFuture>)(Object)(executorService.readAllAsync(results, KEYS, pattern)); + RFuture> f = (RFuture>)(Object)(executorService.readAllAsync(results, StringCodec.INSTANCE, KEYS, pattern)); return sync(f); } diff --git a/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java b/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java index 38cd42699..2426691ea 100644 --- a/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java +++ b/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java @@ -335,7 +335,7 @@ public class RedissonConnection extends AbstractRedisConnection { } Set results = new HashSet(); - RFuture> f = (RFuture>)(Object)(executorService.readAllAsync(results, KEYS, pattern)); + RFuture> f = (RFuture>)(Object)(executorService.readAllAsync(results, StringCodec.INSTANCE, KEYS, pattern)); return sync(f); } diff --git a/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java b/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java index 9a993a5a2..56830866b 100644 --- a/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java +++ b/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java @@ -336,7 +336,7 @@ public class RedissonConnection extends AbstractRedisConnection { } Set results = new HashSet(); - RFuture> f = (RFuture>)(Object)(executorService.readAllAsync(results, KEYS, pattern)); + RFuture> f = (RFuture>)(Object)(executorService.readAllAsync(results, StringCodec.INSTANCE, KEYS, pattern)); return sync(f); } diff --git a/redisson/src/main/java/org/redisson/RedissonMapCache.java b/redisson/src/main/java/org/redisson/RedissonMapCache.java index cac516bea..8ae2054f3 100644 --- a/redisson/src/main/java/org/redisson/RedissonMapCache.java +++ b/redisson/src/main/java/org/redisson/RedissonMapCache.java @@ -165,7 +165,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac " return 1;" + "end;" + "return 0; ", - Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getLastAccessTimeSetNameByKey(key), getOptionsName(key)), + Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getLastAccessTimeSetNameByKey(key), getOptionsNameByKey(key)), System.currentTimeMillis(), encodeMapKey(key)); } From b9f141dba14421694ff76183cd243b8f013b94b1 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 19 Dec 2018 13:29:09 +0300 Subject: [PATCH 3/3] refactoring --- .../org/redisson/spring/data/connection/RedissonConnection.java | 2 +- .../org/redisson/spring/data/connection/RedissonConnection.java | 2 +- .../org/redisson/spring/data/connection/RedissonConnection.java | 2 +- .../org/redisson/spring/data/connection/RedissonConnection.java | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/redisson-spring-data/redisson-spring-data-16/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java b/redisson-spring-data/redisson-spring-data-16/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java index e29dd63a5..39e0481b5 100644 --- a/redisson-spring-data/redisson-spring-data-16/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java +++ b/redisson-spring-data/redisson-spring-data-16/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java @@ -324,7 +324,7 @@ public class RedissonConnection extends AbstractRedisConnection { } Set results = new HashSet(); - RFuture> f = (RFuture>)(Object)(executorService.readAllAsync(results, StringCodec.INSTANCE, KEYS, pattern)); + RFuture> f = (RFuture>)(Object)(executorService.readAllAsync(results, ByteArrayCodec.INSTANCE, KEYS, pattern)); return sync(f); } diff --git a/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java b/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java index 605cde25e..19c0423da 100644 --- a/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java +++ b/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java @@ -326,7 +326,7 @@ public class RedissonConnection extends AbstractRedisConnection { } Set results = new HashSet(); - RFuture> f = (RFuture>)(Object)(executorService.readAllAsync(results, StringCodec.INSTANCE, KEYS, pattern)); + RFuture> f = (RFuture>)(Object)(executorService.readAllAsync(results, ByteArrayCodec.INSTANCE, KEYS, pattern)); return sync(f); } diff --git a/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java b/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java index 2426691ea..786b5cb7b 100644 --- a/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java +++ b/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java @@ -335,7 +335,7 @@ public class RedissonConnection extends AbstractRedisConnection { } Set results = new HashSet(); - RFuture> f = (RFuture>)(Object)(executorService.readAllAsync(results, StringCodec.INSTANCE, KEYS, pattern)); + RFuture> f = (RFuture>)(Object)(executorService.readAllAsync(results, ByteArrayCodec.INSTANCE, KEYS, pattern)); return sync(f); } diff --git a/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java b/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java index 56830866b..ab7fb7f95 100644 --- a/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java +++ b/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java @@ -336,7 +336,7 @@ public class RedissonConnection extends AbstractRedisConnection { } Set results = new HashSet(); - RFuture> f = (RFuture>)(Object)(executorService.readAllAsync(results, StringCodec.INSTANCE, KEYS, pattern)); + RFuture> f = (RFuture>)(Object)(executorService.readAllAsync(results, ByteArrayCodec.INSTANCE, KEYS, pattern)); return sync(f); }