diff --git a/src/main/java/org/redisson/command/CommandReactiveExecutor.java b/src/main/java/org/redisson/command/CommandReactiveExecutor.java index 318a01487..6232e584b 100644 --- a/src/main/java/org/redisson/command/CommandReactiveExecutor.java +++ b/src/main/java/org/redisson/command/CommandReactiveExecutor.java @@ -25,6 +25,8 @@ import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.connection.ConnectionManager; +import io.netty.util.concurrent.Future; + /** * * @author Nikita Koksharov @@ -32,6 +34,8 @@ import org.redisson.connection.ConnectionManager; */ public interface CommandReactiveExecutor extends CommandAsyncExecutor { + Publisher reactive(Future future); + ConnectionManager getConnectionManager(); Publisher evalReadReactive(InetSocketAddress client, String key, Codec codec, RedisCommand evalCommandType, diff --git a/src/main/java/org/redisson/command/CommandReactiveService.java b/src/main/java/org/redisson/command/CommandReactiveService.java index 23a599b9d..a79f6a716 100644 --- a/src/main/java/org/redisson/command/CommandReactiveService.java +++ b/src/main/java/org/redisson/command/CommandReactiveService.java @@ -45,6 +45,10 @@ public class CommandReactiveService extends CommandAsyncService implements Comma return new NettyFuturePublisher(f); } + public Publisher reactive(Future future) { + return new NettyFuturePublisher(future); + } + @Override public Publisher> readAllReactive(RedisCommand command, Object ... params) { Future> f = readAllAsync(command, params); diff --git a/src/main/java/org/redisson/reactive/RedissonKeysReactive.java b/src/main/java/org/redisson/reactive/RedissonKeysReactive.java index ac49e1fe2..e54ce5273 100644 --- a/src/main/java/org/redisson/reactive/RedissonKeysReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonKeysReactive.java @@ -18,14 +18,12 @@ package org.redisson.reactive; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; -import java.util.concurrent.atomic.AtomicLong; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -import org.redisson.SlotCallback; +import org.redisson.RedissonKeys; import org.redisson.api.RKeysReactive; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; @@ -41,14 +39,17 @@ public class RedissonKeysReactive implements RKeysReactive { private final CommandReactiveService commandExecutor; + RedissonKeys instance; + public RedissonKeysReactive(CommandReactiveService commandExecutor) { super(); + instance = new RedissonKeys(commandExecutor); this.commandExecutor = commandExecutor; } @Override public Publisher getSlot(String key) { - return commandExecutor.readReactive(null, RedisCommands.KEYSLOT, key); + return commandExecutor.reactive(instance.getSlotAsync(key)); } @Override @@ -153,59 +154,32 @@ public class RedissonKeysReactive implements RKeysReactive { @Override public Publisher> findKeysByPattern(String pattern) { - return commandExecutor.readAllReactive(RedisCommands.KEYS, pattern); + return commandExecutor.reactive(instance.findKeysByPatternAsync(pattern)); } @Override public Publisher randomKey() { - return commandExecutor.readRandomReactive(RedisCommands.RANDOM_KEY); + return commandExecutor.reactive(instance.randomKeyAsync()); } @Override public Publisher deleteByPattern(String pattern) { - return commandExecutor.evalWriteAllReactive(RedisCommands.EVAL_LONG, new SlotCallback() { - AtomicLong results = new AtomicLong(); - @Override - public void onSlotResult(Long result) { - results.addAndGet(result); - } - - @Override - public Long onFinish() { - return results.get(); - } - }, "local keys = redis.call('keys', ARGV[1]) " - + "local n = 0 " - + "for i=1, table.getn(keys),5000 do " - + "n = n + redis.call('del', unpack(keys, i, math.min(i+4999, table.getn(keys)))) " - + "end " - + "return n;",Collections.emptyList(), pattern); + return commandExecutor.reactive(instance.deleteByPatternAsync(pattern)); } @Override public Publisher delete(String ... keys) { - return commandExecutor.writeAllReactive(RedisCommands.DEL, new SlotCallback() { - AtomicLong results = new AtomicLong(); - @Override - public void onSlotResult(Long result) { - results.addAndGet(result); - } - - @Override - public Long onFinish() { - return results.get(); - } - }, (Object[])keys); + return commandExecutor.reactive(instance.deleteAsync(keys)); } @Override public Publisher flushdb() { - return commandExecutor.writeAllReactive(RedisCommands.FLUSHDB); + return commandExecutor.reactive(instance.flushdbAsync()); } @Override public Publisher flushall() { - return commandExecutor.writeAllReactive(RedisCommands.FLUSHALL); + return commandExecutor.reactive(instance.flushallAsync()); } }