diff --git a/redisson/src/main/java/org/redisson/command/CommandReactiveBatchService.java b/redisson/src/main/java/org/redisson/command/CommandReactiveBatchService.java index a7849cf12..a15d94966 100644 --- a/redisson/src/main/java/org/redisson/command/CommandReactiveBatchService.java +++ b/redisson/src/main/java/org/redisson/command/CommandReactiveBatchService.java @@ -57,6 +57,10 @@ public class CommandReactiveBatchService extends CommandReactiveService { return publisher; } + public Publisher superReactive(Supplier> supplier) { + return super.reactive(supplier); + } + @Override protected void async(boolean readOnlyMode, NodeSource nodeSource, Codec codec, RedisCommand command, Object[] params, RPromise mainPromise, int attempt, boolean ignoreRedirect, RFuture connFuture) { diff --git a/redisson/src/main/java/org/redisson/command/CommandReactiveService.java b/redisson/src/main/java/org/redisson/command/CommandReactiveService.java index dc36e7cd0..84dc2422b 100644 --- a/redisson/src/main/java/org/redisson/command/CommandReactiveService.java +++ b/redisson/src/main/java/org/redisson/command/CommandReactiveService.java @@ -33,6 +33,7 @@ public class CommandReactiveService extends CommandAsyncService implements Comma super(connectionManager); } + @Override public Publisher reactive(Supplier> supplier) { return new NettyFuturePublisher(supplier); } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java index 3d58fd83d..126c733b2 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java @@ -71,7 +71,6 @@ import org.redisson.api.RStreamReactive; import org.redisson.api.RTopicReactive; import org.redisson.api.RedissonReactiveClient; import org.redisson.client.codec.Codec; -import org.redisson.client.codec.StringCodec; import org.redisson.command.CommandReactiveBatchService; import org.redisson.connection.ConnectionManager; import org.redisson.eviction.EvictionScheduler; @@ -281,7 +280,7 @@ public class RedissonBatchReactive implements RBatchReactive { @Override public Publisher> execute() { - return new NettyFuturePublisher>(new Supplier>>() { + return executorService.superReactive(new Supplier>>() { @Override public RFuture> get() { return executorService.executeAsync(options); diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonPatternTopicReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonPatternTopicReactive.java index 40aceb47a..4b44e547c 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonPatternTopicReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonPatternTopicReactive.java @@ -70,7 +70,7 @@ public class RedissonPatternTopicReactive implements RPatternTopicReactive @Override public Publisher addListener(final PatternStatusListener listener) { - return new NettyFuturePublisher(new Supplier>() { + return commandExecutor.reactive(new Supplier>() { @Override public RFuture get() { RPromise promise = new RedissonPromise(); @@ -82,7 +82,7 @@ public class RedissonPatternTopicReactive implements RPatternTopicReactive @Override public Publisher addListener(final PatternMessageListener listener) { - return new NettyFuturePublisher(new Supplier>() { + return commandExecutor.reactive(new Supplier>() { @Override public RFuture get() { RPromise promise = new RedissonPromise(); diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java index 3071e261c..896d2ae6b 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java @@ -118,7 +118,7 @@ public class RedissonTransactionReactive implements RTransactionReactive { @Override public Publisher commit() { - return new NettyFuturePublisher(new Supplier>() { + return executorService.reactive(new Supplier>() { @Override public RFuture get() { return transaction.commitAsync(); @@ -128,7 +128,7 @@ public class RedissonTransactionReactive implements RTransactionReactive { @Override public Publisher rollback() { - return new NettyFuturePublisher(new Supplier>() { + return executorService.reactive(new Supplier>() { @Override public RFuture get() { return transaction.rollbackAsync();