diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java b/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java index 21fea73a6..46ef09c7a 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java @@ -49,7 +49,7 @@ public interface CommandAsyncExecutor { RedisException convertException(ExecutionException e); - void transfer(CompletableFuture future1, CompletableFuture future2); + void transfer(CompletionStage future1, CompletableFuture future2); V getNow(CompletableFuture future); diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 043ed5aff..aaadab268 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -89,7 +89,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { } @Override - public void transfer(CompletableFuture future1, CompletableFuture future2) { + public void transfer(CompletionStage future1, CompletableFuture future2) { future1.whenComplete((res, e) -> { if (e != null) { future2.completeExceptionally(e); diff --git a/redisson/src/main/java/org/redisson/reactive/CommandReactiveBatchService.java b/redisson/src/main/java/org/redisson/reactive/CommandReactiveBatchService.java index 818e54dbe..48e35ee78 100644 --- a/redisson/src/main/java/org/redisson/reactive/CommandReactiveBatchService.java +++ b/redisson/src/main/java/org/redisson/reactive/CommandReactiveBatchService.java @@ -24,10 +24,12 @@ import org.redisson.command.CommandBatchService; import org.redisson.connection.ConnectionManager; import org.redisson.connection.NodeSource; import org.redisson.liveobject.core.RedissonObjectBuilder; +import org.redisson.misc.CompletableFutureWrapper; import reactor.core.publisher.Mono; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; /** * @@ -46,17 +48,14 @@ public class CommandReactiveBatchService extends CommandReactiveService { @Override public Mono reactive(Callable> supplier) { Mono mono = super.reactive(new Callable>() { - volatile RFuture future; + final CompletableFuture future = new CompletableFuture<>(); + final AtomicBoolean lock = new AtomicBoolean(); @Override public RFuture call() throws Exception { - if (future == null) { - synchronized (this) { - if (future == null) { - future = supplier.call(); - } - } + if (lock.compareAndSet(false, true)) { + transfer(supplier.call().toCompletableFuture(), future); } - return future; + return new CompletableFutureWrapper<>(future); } }); mono.subscribe(); diff --git a/redisson/src/main/java/org/redisson/rx/CommandRxBatchService.java b/redisson/src/main/java/org/redisson/rx/CommandRxBatchService.java index f46cce5f7..f7440bd85 100644 --- a/redisson/src/main/java/org/redisson/rx/CommandRxBatchService.java +++ b/redisson/src/main/java/org/redisson/rx/CommandRxBatchService.java @@ -26,9 +26,11 @@ import org.redisson.command.CommandBatchService; import org.redisson.connection.ConnectionManager; import org.redisson.connection.NodeSource; import org.redisson.liveobject.core.RedissonObjectBuilder; +import org.redisson.misc.CompletableFutureWrapper; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; /** * @@ -47,17 +49,14 @@ public class CommandRxBatchService extends CommandRxService { @Override public Flowable flowable(Callable> supplier) { Flowable flowable = super.flowable(new Callable>() { - volatile RFuture future; + final CompletableFuture future = new CompletableFuture<>(); + final AtomicBoolean lock = new AtomicBoolean(); @Override public RFuture call() throws Exception { - if (future == null) { - synchronized (this) { - if (future == null) { - future = supplier.call(); - } - } + if (lock.compareAndSet(false, true)) { + transfer(supplier.call().toCompletableFuture(), future); } - return future; + return new CompletableFutureWrapper<>(future); } }); flowable.subscribe(); diff --git a/redisson/src/test/java/org/redisson/rx/RedissonBatchRxTest.java b/redisson/src/test/java/org/redisson/rx/RedissonBatchRxTest.java index d7670697d..ca08c7deb 100644 --- a/redisson/src/test/java/org/redisson/rx/RedissonBatchRxTest.java +++ b/redisson/src/test/java/org/redisson/rx/RedissonBatchRxTest.java @@ -99,7 +99,7 @@ public class RedissonBatchRxTest extends BaseRxTest { @ParameterizedTest @MethodSource("data") public void testPerformance(BatchOptions batchOptions) { - Assertions.assertTimeout(Duration.ofSeconds(20), () -> { + Assertions.assertTimeout(Duration.ofSeconds(21), () -> { RMapRx map = redisson.getMap("map"); Map m = new HashMap(); for (int j = 0; j < 1000; j++) {