From e89cce3b3ec85aa42054dd98b063c11dd22e9aae Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 19 Dec 2018 13:03:09 +0300 Subject: [PATCH] refactoring # Conflicts: # redisson/src/main/java/org/redisson/reactive/MapReactiveIterator.java # redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java # redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java # redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java # redisson/src/main/java/org/redisson/reactive/SetReactiveIterator.java --- .../redisson/client/protocol/RedisCommands.java | 2 +- .../redisson/command/CommandAsyncExecutor.java | 8 ++++++-- .../org/redisson/command/CommandAsyncService.java | 15 +++++++++++---- .../redisson/pubsub/PubSubConnectionEntry.java | 2 +- ...tiveIterator.java => MapReactiveIterator.java} | 11 ++++++++--- .../redisson/reactive/RedissonKeysReactive.java | 4 ++-- .../reactive/RedissonMapCacheReactive.java | 6 +++--- .../redisson/reactive/RedissonMapReactive.java | 6 +++--- 8 files changed, 35 insertions(+), 19 deletions(-) rename redisson/src/main/java/org/redisson/reactive/{RedissonMapReactiveIterator.java => MapReactiveIterator.java} (90%) diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index 26a724c1a..825c67a10 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -146,7 +146,7 @@ public interface RedisCommands { RedisCommand> ZRANGEBYSCORE = new RedisCommand>("ZRANGEBYSCORE", new ObjectSetReplayDecoder()); RedisCommand> ZRANGEBYSCORE_LIST = new RedisCommand>("ZRANGEBYSCORE", new ObjectListReplayDecoder()); RedisCommand> ZREVRANGE = new RedisCommand>("ZREVRANGE", new ObjectListReplayDecoder()); - RedisCommand> ZREVRANGEBYSCORE = new RedisCommand>("ZREVRANGEBYSCORE", new ObjectListReplayDecoder()); + RedisCommand> ZREVRANGEBYSCORE = new RedisCommand>("ZREVRANGEBYSCORE", new ObjectSetReplayDecoder()); RedisCommand>> ZREVRANGE_ENTRY = new RedisCommand>>("ZREVRANGE", new ScoredSortedSetReplayDecoder()); RedisCommand>> ZREVRANGEBYSCORE_ENTRY = new RedisCommand>>("ZREVRANGEBYSCORE", new ScoredSortedSetReplayDecoder()); RedisCommand>> ZRANGE_ENTRY = new RedisCommand>>("ZRANGE", new ScoredSortedSetReplayDecoder()); diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java b/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java index 05cd1552a..2121e3da2 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java @@ -58,6 +58,8 @@ public interface CommandAsyncExecutor { RFuture writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand command, Object ... params); + RFuture writeAsync(byte[] key, Codec codec, RedisCommand command, Object... params); + RFuture readAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand command, Object ... params); RFuture readAsync(RedisClient client, String name, Codec codec, RedisCommand command, Object ... params); @@ -70,8 +72,12 @@ public interface CommandAsyncExecutor { RFuture writeAllAsync(RedisCommand command, SlotCallback callback, Object ... params); + RFuture> readAllAsync(Codec codec, RedisCommand command, Object... params); + RFuture readAllAsync(RedisCommand command, SlotCallback callback, Object ... params); + RFuture> readAllAsync(Collection results, Codec codec, RedisCommand command, Object... params); + RFuture evalReadAsync(RedisClient client, String name, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params); RFuture evalReadAsync(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params); @@ -90,8 +96,6 @@ public interface CommandAsyncExecutor { RFuture> readAllAsync(RedisCommand command, Object ... params); - RFuture> readAllAsync(Collection results, RedisCommand command, Object ... params); - RFuture writeAllAsync(Codec codec, RedisCommand command, SlotCallback callback, Object... params); RFuture writeAllAsync(RedisCommand command, Object ... params); diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 93b3235cb..cabfd1ec7 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -245,13 +245,20 @@ public class CommandAsyncService implements CommandAsyncExecutor { return mainPromise; } + @Override + public RFuture> readAllAsync(Codec codec, RedisCommand command, Object... params) { + List results = new ArrayList(); + return readAllAsync(results, codec, command, params); + } + + @Override public RFuture> readAllAsync(RedisCommand command, Object... params) { List results = new ArrayList(); - return readAllAsync(results, command, params); + return readAllAsync(results, connectionManager.getCodec(), command, params); } @Override - public RFuture> readAllAsync(final Collection results, RedisCommand command, Object... params) { + public RFuture> readAllAsync(final Collection results, Codec codec, RedisCommand command, Object... params) { final RPromise> mainPromise = createPromise(); final Collection nodes = connectionManager.getEntrySet(); final AtomicInteger counter = new AtomicInteger(nodes.size()); @@ -284,11 +291,11 @@ public class CommandAsyncService implements CommandAsyncExecutor { for (MasterSlaveEntry entry : nodes) { RPromise promise = new RedissonPromise(); promise.addListener(listener); - async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0, true, null); + async(true, new NodeSource(entry), codec, command, params, promise, 0, true, null); } return mainPromise; } - + @Override public RFuture readRandomAsync(Codec codec, RedisCommand command, Object... params) { RPromise mainPromise = createPromise(); diff --git a/redisson/src/main/java/org/redisson/pubsub/PubSubConnectionEntry.java b/redisson/src/main/java/org/redisson/pubsub/PubSubConnectionEntry.java index 5702a0445..36bcafb50 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PubSubConnectionEntry.java +++ b/redisson/src/main/java/org/redisson/pubsub/PubSubConnectionEntry.java @@ -138,7 +138,7 @@ public class PubSubConnectionEntry { return false; } - private void removeListener(ChannelName channelName, RedisPubSubListener listener) { + public void removeListener(ChannelName channelName, RedisPubSubListener listener) { Queue> queue = channelListeners.get(channelName); synchronized (queue) { if (queue.remove(listener) && queue.isEmpty()) { diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactiveIterator.java b/redisson/src/main/java/org/redisson/reactive/MapReactiveIterator.java similarity index 90% rename from redisson/src/main/java/org/redisson/reactive/RedissonMapReactiveIterator.java rename to redisson/src/main/java/org/redisson/reactive/MapReactiveIterator.java index be2fc4a32..0f75dba33 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactiveIterator.java +++ b/redisson/src/main/java/org/redisson/reactive/MapReactiveIterator.java @@ -21,6 +21,7 @@ import java.util.Map.Entry; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.redisson.RedissonMap; +import org.redisson.api.RFuture; import org.redisson.client.RedisClient; import org.redisson.client.protocol.decoder.MapScanResult; @@ -37,13 +38,13 @@ import reactor.rx.subscription.ReactiveSubscription; * @param value type * @param entry type */ -public class RedissonMapReactiveIterator { +public class MapReactiveIterator { private final RedissonMap map; private final String pattern; private final int count; - public RedissonMapReactiveIterator(RedissonMap map, String pattern, int count) { + public MapReactiveIterator(RedissonMap map, String pattern, int count) { this.map = map; this.pattern = pattern; this.count = count; @@ -69,7 +70,7 @@ public class RedissonMapReactiveIterator { protected void nextValues() { final ReactiveSubscription m = this; - map.scanIteratorAsync(map.getName(), client, nextIterPos, pattern, count).addListener(new FutureListener>() { + scanIterator(client, nextIterPos).addListener(new FutureListener>() { @Override public void operationComplete(Future> future) @@ -129,4 +130,8 @@ public class RedissonMapReactiveIterator { }; } + public RFuture> scanIterator(RedisClient client, long nextIterPos) { + return map.scanIteratorAsync(map.getName(), client, nextIterPos, pattern, count); + } + } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java index fd104c204..6473b8a6c 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java @@ -38,11 +38,11 @@ import reactor.rx.subscription.ReactiveSubscription; */ public class RedissonKeysReactive { - private final CommandReactiveService commandExecutor; + private final CommandReactiveExecutor commandExecutor; private final RedissonKeys instance; - public RedissonKeysReactive(CommandReactiveService commandExecutor) { + public RedissonKeysReactive(CommandReactiveExecutor commandExecutor) { super(); instance = new RedissonKeys(commandExecutor); this.commandExecutor = commandExecutor; diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java index 45bd226f4..85a6319f8 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java @@ -50,7 +50,7 @@ public class RedissonMapCacheReactive { } public Publisher> entryIterator(String pattern, int count) { - return new RedissonMapReactiveIterator>((RedissonMap) mapCache, pattern, count).stream(); + return new MapReactiveIterator>((RedissonMap) mapCache, pattern, count).stream(); } public Publisher valueIterator() { @@ -66,7 +66,7 @@ public class RedissonMapCacheReactive { } public Publisher valueIterator(String pattern, int count) { - return new RedissonMapReactiveIterator((RedissonMap) mapCache, pattern, count) { + return new MapReactiveIterator((RedissonMap) mapCache, pattern, count) { @Override V getValue(Entry entry) { return (V) entry.getValue(); @@ -87,7 +87,7 @@ public class RedissonMapCacheReactive { } public Publisher keyIterator(String pattern, int count) { - return new RedissonMapReactiveIterator((RedissonMap) mapCache, pattern, count) { + return new MapReactiveIterator((RedissonMap) mapCache, pattern, count) { @Override K getValue(Entry entry) { return (K) entry.getKey(); diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java index de6897314..fe8847dc6 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java @@ -60,7 +60,7 @@ public class RedissonMapReactive { } public Publisher> entryIterator(String pattern, int count) { - return new RedissonMapReactiveIterator>((RedissonMap) instance, pattern, count).stream(); + return new MapReactiveIterator>((RedissonMap) instance, pattern, count).stream(); } public Publisher valueIterator() { @@ -76,7 +76,7 @@ public class RedissonMapReactive { } public Publisher valueIterator(String pattern, int count) { - return new RedissonMapReactiveIterator((RedissonMap) instance, pattern, count) { + return new MapReactiveIterator((RedissonMap) instance, pattern, count) { @Override V getValue(Entry entry) { return (V) entry.getValue(); @@ -97,7 +97,7 @@ public class RedissonMapReactive { } public Publisher keyIterator(String pattern, int count) { - return new RedissonMapReactiveIterator((RedissonMap) instance, pattern, count) { + return new MapReactiveIterator((RedissonMap) instance, pattern, count) { @Override K getValue(Entry entry) { return (K) entry.getKey();