From bd74e07fdbbe3399d808d2b332549578fd9c634e Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Fri, 15 Mar 2019 12:02:08 +0300 Subject: [PATCH] Fixed - ClassCastException during RBatchReactive and RBatchRx execution in exectionMode = REDIS_WRITE_ATOMIC or REDIS_READ_ATOMIC. #1661 --- .../redisson/command/CommandBatchService.java | 2 +- .../reactive/CommandReactiveBatchService.java | 37 +++++++++++-------- .../reactive/RedissonBatchReactive.java | 11 +----- .../redisson/rx/CommandRxBatchService.java | 20 +++++----- .../java/org/redisson/rx/RedissonBatchRx.java | 2 +- .../org/redisson/rx/RedissonBatchRxTest.java | 14 ++++--- 6 files changed, 42 insertions(+), 44 deletions(-) diff --git a/redisson/src/main/java/org/redisson/command/CommandBatchService.java b/redisson/src/main/java/org/redisson/command/CommandBatchService.java index 6b201e9fc..cb318d040 100644 --- a/redisson/src/main/java/org/redisson/command/CommandBatchService.java +++ b/redisson/src/main/java/org/redisson/command/CommandBatchService.java @@ -188,7 +188,7 @@ public class CommandBatchService extends CommandAsyncService { AsyncSemaphore semaphore = new AsyncSemaphore(0); @Override - protected RPromise createPromise() { + public RPromise createPromise() { if (isRedisBasedQueue()) { return new BatchPromise(executed); } diff --git a/redisson/src/main/java/org/redisson/reactive/CommandReactiveBatchService.java b/redisson/src/main/java/org/redisson/reactive/CommandReactiveBatchService.java index 9c5069fe3..cac2ab014 100644 --- a/redisson/src/main/java/org/redisson/reactive/CommandReactiveBatchService.java +++ b/redisson/src/main/java/org/redisson/reactive/CommandReactiveBatchService.java @@ -15,11 +15,8 @@ */ package org.redisson.reactive; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.function.Supplier; -import org.reactivestreams.Publisher; import org.redisson.api.BatchOptions; import org.redisson.api.BatchResult; import org.redisson.api.RFuture; @@ -32,7 +29,6 @@ import org.redisson.connection.ConnectionManager; import org.redisson.connection.NodeSource; import org.redisson.misc.RPromise; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -43,22 +39,35 @@ import reactor.core.publisher.Mono; public class CommandReactiveBatchService extends CommandReactiveService { private final CommandBatchService batchService; - private final Queue> publishers = new ConcurrentLinkedQueue>(); - public CommandReactiveBatchService(ConnectionManager connectionManager) { + public CommandReactiveBatchService(ConnectionManager connectionManager, BatchOptions options) { super(connectionManager); - batchService = new CommandBatchService(connectionManager); + batchService = new CommandBatchService(connectionManager, options); } @Override public Mono reactive(Supplier> supplier) { - Mono publisher = super.reactive(supplier); - publishers.add(publisher); - return publisher; + Mono mono = super.reactive(new Supplier>() { + volatile RFuture future; + @Override + public RFuture get() { + if (future == null) { + synchronized (this) { + if (future == null) { + future = supplier.get(); + } + } + } + return future; + } + }); + mono.subscribe(); + return mono; } - public Publisher superReactive(Supplier> supplier) { - return super.reactive(supplier); + @Override + protected RPromise createPromise() { + return batchService.createPromise(); } @Override @@ -68,10 +77,6 @@ public class CommandReactiveBatchService extends CommandReactiveService { } public RFuture> executeAsync(BatchOptions options) { - for (Publisher publisher : publishers) { - Flux.from(publisher).subscribe(); - } - return batchService.executeAsync(options); } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java index 65685055a..70cb28909 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java @@ -16,7 +16,6 @@ package org.redisson.reactive; import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; import org.redisson.RedissonAtomicDouble; import org.redisson.RedissonAtomicLong; @@ -51,7 +50,6 @@ import org.redisson.api.RBlockingDequeReactive; import org.redisson.api.RBlockingQueueReactive; import org.redisson.api.RBucketReactive; import org.redisson.api.RDequeReactive; -import org.redisson.api.RFuture; import org.redisson.api.RGeoReactive; import org.redisson.api.RHyperLogLogReactive; import org.redisson.api.RKeysReactive; @@ -91,7 +89,7 @@ public class RedissonBatchReactive implements RBatchReactive { public RedissonBatchReactive(EvictionScheduler evictionScheduler, ConnectionManager connectionManager, CommandReactiveService commandExecutor, BatchOptions options) { this.evictionScheduler = evictionScheduler; - this.executorService = new CommandReactiveBatchService(connectionManager); + this.executorService = new CommandReactiveBatchService(connectionManager, options); this.commandExecutor = commandExecutor; this.options = options; } @@ -287,12 +285,7 @@ public class RedissonBatchReactive implements RBatchReactive { @Override public Mono> execute() { - return commandExecutor.reactive(new Supplier>>() { - @Override - public RFuture> get() { - return executorService.executeAsync(options); - } - }); + return commandExecutor.reactive(() -> executorService.executeAsync(options)); } public RBatchReactive atomic() { diff --git a/redisson/src/main/java/org/redisson/rx/CommandRxBatchService.java b/redisson/src/main/java/org/redisson/rx/CommandRxBatchService.java index e03e5deea..3acacac35 100644 --- a/redisson/src/main/java/org/redisson/rx/CommandRxBatchService.java +++ b/redisson/src/main/java/org/redisson/rx/CommandRxBatchService.java @@ -15,9 +15,7 @@ */ package org.redisson.rx; -import java.util.Queue; import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentLinkedQueue; import org.redisson.api.BatchOptions; import org.redisson.api.BatchResult; @@ -41,17 +39,16 @@ import io.reactivex.Flowable; public class CommandRxBatchService extends CommandRxService { private final CommandBatchService batchService; - private final Queue> publishers = new ConcurrentLinkedQueue<>(); - public CommandRxBatchService(ConnectionManager connectionManager) { + public CommandRxBatchService(ConnectionManager connectionManager, BatchOptions options) { super(connectionManager); - batchService = new CommandBatchService(connectionManager); + batchService = new CommandBatchService(connectionManager, options); } @Override public Flowable flowable(Callable> supplier) { Flowable flowable = super.flowable(new Callable>() { - RFuture future; + volatile RFuture future; @Override public RFuture call() throws Exception { if (future == null) { @@ -64,10 +61,15 @@ public class CommandRxBatchService extends CommandRxService { return future; } }); - publishers.add(flowable); + flowable.subscribe(); return flowable; } + @Override + protected RPromise createPromise() { + return batchService.createPromise(); + } + @Override public void async(boolean readOnlyMode, NodeSource nodeSource, Codec codec, RedisCommand command, Object[] params, RPromise mainPromise, int attempt, boolean ignoreRedirect) { @@ -75,10 +77,6 @@ public class CommandRxBatchService extends CommandRxService { } public RFuture> executeAsync(BatchOptions options) { - for (Flowable element : publishers) { - element.subscribe(); - } - return batchService.executeAsync(options); } diff --git a/redisson/src/main/java/org/redisson/rx/RedissonBatchRx.java b/redisson/src/main/java/org/redisson/rx/RedissonBatchRx.java index 670d59dc3..a5322e442 100644 --- a/redisson/src/main/java/org/redisson/rx/RedissonBatchRx.java +++ b/redisson/src/main/java/org/redisson/rx/RedissonBatchRx.java @@ -92,7 +92,7 @@ public class RedissonBatchRx implements RBatchRx { public RedissonBatchRx(EvictionScheduler evictionScheduler, ConnectionManager connectionManager, CommandRxExecutor commandExecutor, BatchOptions options) { this.evictionScheduler = evictionScheduler; - this.executorService = new CommandRxBatchService(connectionManager); + this.executorService = new CommandRxBatchService(connectionManager, options); this.commandExecutor = commandExecutor; this.options = options; } diff --git a/redisson/src/test/java/org/redisson/rx/RedissonBatchRxTest.java b/redisson/src/test/java/org/redisson/rx/RedissonBatchRxTest.java index 3ddccff2f..16f8e2a6f 100644 --- a/redisson/src/test/java/org/redisson/rx/RedissonBatchRxTest.java +++ b/redisson/src/test/java/org/redisson/rx/RedissonBatchRxTest.java @@ -53,7 +53,7 @@ public class RedissonBatchRxTest extends BaseRxTest { public static Iterable data() { return Arrays.asList(new Object[][] { {BatchOptions.defaults().executionMode(ExecutionMode.IN_MEMORY)}, -// {BatchOptions.defaults().executionMode(ExecutionMode.REDIS_WRITE_ATOMIC)} + {BatchOptions.defaults().executionMode(ExecutionMode.REDIS_WRITE_ATOMIC)} }); } @@ -131,7 +131,7 @@ public class RedissonBatchRxTest extends BaseRxTest { } } -// @Test + @Test public void testConnectionLeakAfterError() throws InterruptedException { Config config = BaseTest.createConfig(); config.useSingleServer() @@ -143,7 +143,7 @@ public class RedissonBatchRxTest extends BaseRxTest { BatchOptions batchOptions = BatchOptions.defaults().executionMode(ExecutionMode.REDIS_WRITE_ATOMIC); RBatchRx batch = redisson.createBatch(batchOptions); - for (int i = 0; i < 3000; i++) { + for (int i = 0; i < 300000; i++) { batch.getBucket("test").set(123); } @@ -227,7 +227,7 @@ public class RedissonBatchRxTest extends BaseRxTest { } @Test - public void testWriteTimeout() { + public void testWriteTimeout() throws InterruptedException { Config config = BaseTest.createConfig(); config.useSingleServer().setTimeout(3000); RedissonRxClient redisson = Redisson.createRx(config); @@ -236,9 +236,11 @@ public class RedissonBatchRxTest extends BaseRxTest { RMapCacheRx map = batch.getMapCache("test"); int total = 200000; for (int i = 0; i < total; i++) { - Maybe f = map.put("" + i, "" + i, 5, TimeUnit.MINUTES); + map.put("" + i, "" + i, 5, TimeUnit.MINUTES); if (batchOptions.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC) { - f.blockingGet(); + if (i % 100 == 0) { + Thread.sleep(5); + } } }