From 2bac289bc28ac3f475c2f25ca38c874d0f540dcc Mon Sep 17 00:00:00 2001
From: Nikita Koksharov <nkoksharov@redisson.pro>
Date: Tue, 2 May 2023 08:24:24 +0300
Subject: [PATCH] refactoring

---
 .../redisson/command/CommandAsyncExecutor.java    |  2 +-
 .../org/redisson/command/CommandAsyncService.java |  2 +-
 .../reactive/CommandReactiveBatchService.java     | 15 +++++++--------
 .../org/redisson/rx/CommandRxBatchService.java    | 15 +++++++--------
 .../java/org/redisson/rx/RedissonBatchRxTest.java |  2 +-
 5 files changed, 17 insertions(+), 19 deletions(-)

diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java b/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java
index 21fea73a6..46ef09c7a 100644
--- a/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java
+++ b/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java
@@ -49,7 +49,7 @@ public interface CommandAsyncExecutor {
 
     RedisException convertException(ExecutionException e);
 
-    <V> void transfer(CompletableFuture<V> future1, CompletableFuture<V> future2);
+    <V> void transfer(CompletionStage<V> future1, CompletableFuture<V> future2);
 
     <V> V getNow(CompletableFuture<V> future);
 
diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java
index 043ed5aff..aaadab268 100644
--- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java
+++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java
@@ -89,7 +89,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
     }
 
     @Override
-    public <V> void transfer(CompletableFuture<V> future1, CompletableFuture<V> future2) {
+    public <V> void transfer(CompletionStage<V> future1, CompletableFuture<V> future2) {
         future1.whenComplete((res, e) -> {
             if (e != null) {
                 future2.completeExceptionally(e);
diff --git a/redisson/src/main/java/org/redisson/reactive/CommandReactiveBatchService.java b/redisson/src/main/java/org/redisson/reactive/CommandReactiveBatchService.java
index 818e54dbe..48e35ee78 100644
--- a/redisson/src/main/java/org/redisson/reactive/CommandReactiveBatchService.java
+++ b/redisson/src/main/java/org/redisson/reactive/CommandReactiveBatchService.java
@@ -24,10 +24,12 @@ import org.redisson.command.CommandBatchService;
 import org.redisson.connection.ConnectionManager;
 import org.redisson.connection.NodeSource;
 import org.redisson.liveobject.core.RedissonObjectBuilder;
+import org.redisson.misc.CompletableFutureWrapper;
 import reactor.core.publisher.Mono;
 
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * 
@@ -46,17 +48,14 @@ public class CommandReactiveBatchService extends CommandReactiveService {
     @Override
     public <R> Mono<R> reactive(Callable<RFuture<R>> supplier) {
         Mono<R> mono = super.reactive(new Callable<RFuture<R>>() {
-            volatile RFuture<R> future;
+            final CompletableFuture<R> future = new CompletableFuture<>();
+            final AtomicBoolean lock = new AtomicBoolean();
             @Override
             public RFuture<R> call() throws Exception {
-                if (future == null) {
-                    synchronized (this) {
-                        if (future == null) {
-                            future = supplier.call();
-                        }
-                    }
+                if (lock.compareAndSet(false, true)) {
+                    transfer(supplier.call().toCompletableFuture(), future);
                 }
-                return future;
+                return new CompletableFutureWrapper<>(future);
             }
         });
         mono.subscribe();
diff --git a/redisson/src/main/java/org/redisson/rx/CommandRxBatchService.java b/redisson/src/main/java/org/redisson/rx/CommandRxBatchService.java
index f46cce5f7..f7440bd85 100644
--- a/redisson/src/main/java/org/redisson/rx/CommandRxBatchService.java
+++ b/redisson/src/main/java/org/redisson/rx/CommandRxBatchService.java
@@ -26,9 +26,11 @@ import org.redisson.command.CommandBatchService;
 import org.redisson.connection.ConnectionManager;
 import org.redisson.connection.NodeSource;
 import org.redisson.liveobject.core.RedissonObjectBuilder;
+import org.redisson.misc.CompletableFutureWrapper;
 
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * 
@@ -47,17 +49,14 @@ public class CommandRxBatchService extends CommandRxService {
     @Override
     public <R> Flowable<R> flowable(Callable<RFuture<R>> supplier) {
         Flowable<R> flowable = super.flowable(new Callable<RFuture<R>>() {
-            volatile RFuture<R> future;
+            final CompletableFuture<R> future = new CompletableFuture<>();
+            final AtomicBoolean lock = new AtomicBoolean();
             @Override
             public RFuture<R> call() throws Exception {
-                if (future == null) {
-                    synchronized (this) {
-                        if (future == null) {
-                            future = supplier.call();
-                        }
-                    }
+                if (lock.compareAndSet(false, true)) {
+                    transfer(supplier.call().toCompletableFuture(), future);
                 }
-                return future;
+                return new CompletableFutureWrapper<>(future);
             }
         });
         flowable.subscribe();
diff --git a/redisson/src/test/java/org/redisson/rx/RedissonBatchRxTest.java b/redisson/src/test/java/org/redisson/rx/RedissonBatchRxTest.java
index d7670697d..ca08c7deb 100644
--- a/redisson/src/test/java/org/redisson/rx/RedissonBatchRxTest.java
+++ b/redisson/src/test/java/org/redisson/rx/RedissonBatchRxTest.java
@@ -99,7 +99,7 @@ public class RedissonBatchRxTest extends BaseRxTest {
     @ParameterizedTest
     @MethodSource("data")
     public void testPerformance(BatchOptions batchOptions) {
-        Assertions.assertTimeout(Duration.ofSeconds(20), () -> {
+        Assertions.assertTimeout(Duration.ofSeconds(21), () -> {
             RMapRx<String, String> map = redisson.getMap("map");
             Map<String, String> m = new HashMap<String, String>();
             for (int j = 0; j < 1000; j++) {