From 53e4c223d674cc59e2301f89e2dde22b29bea83d Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 28 Aug 2019 13:12:50 +0300 Subject: [PATCH] Fixed - Redisson couldn't be shutdown if one of RBatch commands was canceled. #2275 --- .../command/BaseRedisBatchExecutor.java | 10 ++----- .../org/redisson/command/BatchPromise.java | 5 ++++ .../redisson/command/CommandBatchService.java | 20 ++++++++----- .../redisson/command/RedisBatchExecutor.java | 8 ++---- .../command/RedisCommonBatchExecutor.java | 13 +++++++-- .../command/RedisQueuedBatchExecutor.java | 8 +++++- .../java/org/redisson/RedissonBatchTest.java | 28 +++++++++++++++++++ 7 files changed, 69 insertions(+), 23 deletions(-) diff --git a/redisson/src/main/java/org/redisson/command/BaseRedisBatchExecutor.java b/redisson/src/main/java/org/redisson/command/BaseRedisBatchExecutor.java index c7bd48445..94275c2c3 100644 --- a/redisson/src/main/java/org/redisson/command/BaseRedisBatchExecutor.java +++ b/redisson/src/main/java/org/redisson/command/BaseRedisBatchExecutor.java @@ -23,14 +23,12 @@ import org.redisson.api.BatchOptions; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.BatchCommandData; import org.redisson.client.protocol.RedisCommand; -import org.redisson.command.CommandBatchService.ConnectionEntry; import org.redisson.command.CommandBatchService.Entry; import org.redisson.connection.ConnectionManager; import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.NodeSource; import org.redisson.liveobject.core.RedissonObjectBuilder; import org.redisson.misc.RPromise; -import org.redisson.pubsub.AsyncSemaphore; /** * @@ -42,28 +40,24 @@ import org.redisson.pubsub.AsyncSemaphore; public class BaseRedisBatchExecutor extends RedisExecutor { final ConcurrentMap commands; - final ConcurrentMap connections; final BatchOptions options; final AtomicInteger index; final AtomicBoolean executed; - final AsyncSemaphore semaphore; @SuppressWarnings("ParameterNumber") public BaseRedisBatchExecutor(boolean readOnlyMode, NodeSource source, Codec codec, RedisCommand command, Object[] params, RPromise mainPromise, boolean ignoreRedirect, ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder, - ConcurrentMap commands, ConcurrentMap connections, - BatchOptions options, AtomicInteger index, AtomicBoolean executed, AsyncSemaphore semaphore) { + ConcurrentMap commands, + BatchOptions options, AtomicInteger index, AtomicBoolean executed) { super(readOnlyMode, source, codec, command, params, mainPromise, ignoreRedirect, connectionManager, objectBuilder); this.commands = commands; - this.connections = connections; this.options = options; this.index = index; this.executed = executed; - this.semaphore = semaphore; } protected final void addBatchCommandData(Object[] batchParams) { diff --git a/redisson/src/main/java/org/redisson/command/BatchPromise.java b/redisson/src/main/java/org/redisson/command/BatchPromise.java index 9b3885b26..5c02425a1 100644 --- a/redisson/src/main/java/org/redisson/command/BatchPromise.java +++ b/redisson/src/main/java/org/redisson/command/BatchPromise.java @@ -40,6 +40,11 @@ public class BatchPromise extends RedissonPromise { return sentPromise; } + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + @Override public RPromise sync() throws InterruptedException { if (executed.get()) { diff --git a/redisson/src/main/java/org/redisson/command/CommandBatchService.java b/redisson/src/main/java/org/redisson/command/CommandBatchService.java index b7f546a4a..612adfa89 100644 --- a/redisson/src/main/java/org/redisson/command/CommandBatchService.java +++ b/redisson/src/main/java/org/redisson/command/CommandBatchService.java @@ -140,7 +140,7 @@ public class CommandBatchService extends CommandAsyncService { executor.execute(); } else { RedisExecutor executor = new RedisBatchExecutor<>(readOnlyMode, nodeSource, codec, command, params, mainPromise, - true, connectionManager, objectBuilder, commands, connections, options, index, executed, semaphore); + true, connectionManager, objectBuilder, commands, options, index, executed); executor.execute(); } @@ -149,9 +149,10 @@ public class CommandBatchService extends CommandAsyncService { @Override public RPromise createPromise() { if (isRedisBasedQueue()) { - return new BatchPromise(executed); + return new BatchPromise<>(executed); } - return super.createPromise(); + + return new RedissonPromise<>(); } public BatchResult execute() { @@ -258,6 +259,11 @@ public class CommandBatchService extends CommandAsyncService { syncedSlaves = (Integer) commandEntry.getPromise().getNow(); } else if (!commandEntry.getCommand().getName().equals(RedisCommands.MULTI.getName()) && !commandEntry.getCommand().getName().equals(RedisCommands.EXEC.getName())) { + + if (commandEntry.getPromise().isCancelled()) { + continue; + } + Object entryResult = commandEntry.getPromise().getNow(); try { entryResult = RedisExecutor.tryHandleReference(objectBuilder, entryResult); @@ -321,12 +327,12 @@ public class CommandBatchService extends CommandAsyncService { return; } - RPromise>> mainPromise = new RedissonPromise>>(); - Map> result = new ConcurrentHashMap>(); - CountableListener>> listener = new CountableListener>>(mainPromise, result); + RPromise>> mainPromise = new RedissonPromise<>(); + Map> result = new ConcurrentHashMap<>(); + CountableListener>> listener = new CountableListener<>(mainPromise, result); listener.setCounter(connections.size()); for (Map.Entry entry : commands.entrySet()) { - RPromise> execPromise = new RedissonPromise>(); + RPromise> execPromise = new RedissonPromise<>(); async(entry.getValue().isReadOnlyMode(), new NodeSource(entry.getKey()), connectionManager.getCodec(), RedisCommands.EXEC, new Object[] {}, execPromise, false); execPromise.onComplete((r, ex) -> { diff --git a/redisson/src/main/java/org/redisson/command/RedisBatchExecutor.java b/redisson/src/main/java/org/redisson/command/RedisBatchExecutor.java index a7626e9db..f397a461d 100644 --- a/redisson/src/main/java/org/redisson/command/RedisBatchExecutor.java +++ b/redisson/src/main/java/org/redisson/command/RedisBatchExecutor.java @@ -22,14 +22,12 @@ import java.util.concurrent.atomic.AtomicInteger; import org.redisson.api.BatchOptions; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; -import org.redisson.command.CommandBatchService.ConnectionEntry; import org.redisson.command.CommandBatchService.Entry; import org.redisson.connection.ConnectionManager; import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.NodeSource; import org.redisson.liveobject.core.RedissonObjectBuilder; import org.redisson.misc.RPromise; -import org.redisson.pubsub.AsyncSemaphore; /** * @@ -44,10 +42,10 @@ public class RedisBatchExecutor extends BaseRedisBatchExecutor { public RedisBatchExecutor(boolean readOnlyMode, NodeSource source, Codec codec, RedisCommand command, Object[] params, RPromise mainPromise, boolean ignoreRedirect, ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder, ConcurrentMap commands, - ConcurrentMap connections, BatchOptions options, AtomicInteger index, - AtomicBoolean executed, AsyncSemaphore semaphore) { + BatchOptions options, AtomicInteger index, + AtomicBoolean executed) { super(readOnlyMode, source, codec, command, params, mainPromise, ignoreRedirect, connectionManager, objectBuilder, - commands, connections, options, index, executed, semaphore); + commands, options, index, executed); } @Override diff --git a/redisson/src/main/java/org/redisson/command/RedisCommonBatchExecutor.java b/redisson/src/main/java/org/redisson/command/RedisCommonBatchExecutor.java index c21b59706..5da110a70 100644 --- a/redisson/src/main/java/org/redisson/command/RedisCommonBatchExecutor.java +++ b/redisson/src/main/java/org/redisson/command/RedisCommonBatchExecutor.java @@ -96,13 +96,22 @@ public class RedisCommonBatchExecutor extends RedisExecutor { list.add(new CommandData(promise, StringCodec.INSTANCE, RedisCommands.ASKING, new Object[] {})); } for (CommandData c : entry.getCommands()) { - if (c.getPromise().isSuccess() && !isWaitCommand(c) && !isAtomic) { - // skip successful commands + if ((c.getPromise().isCancelled() || c.getPromise().isSuccess()) + && !isWaitCommand(c) + && !isAtomic) { + // skip command continue; } list.add(c); } + if (list.isEmpty()) { + writeFuture = connection.getChannel().newPromise(); + attemptPromise.trySuccess(null); + timeout.cancel(); + return; + } + writeFuture = connection.send(new CommandsData(attemptPromise, list, options.isSkipResult(), isAtomic, isQueued)); } diff --git a/redisson/src/main/java/org/redisson/command/RedisQueuedBatchExecutor.java b/redisson/src/main/java/org/redisson/command/RedisQueuedBatchExecutor.java index 01744601f..817833eb7 100644 --- a/redisson/src/main/java/org/redisson/command/RedisQueuedBatchExecutor.java +++ b/redisson/src/main/java/org/redisson/command/RedisQueuedBatchExecutor.java @@ -52,6 +52,9 @@ import org.redisson.pubsub.AsyncSemaphore; */ public class RedisQueuedBatchExecutor extends BaseRedisBatchExecutor { + private final ConcurrentMap connections; + private final AsyncSemaphore semaphore; + @SuppressWarnings("ParameterNumber") public RedisQueuedBatchExecutor(boolean readOnlyMode, NodeSource source, Codec codec, RedisCommand command, Object[] params, RPromise mainPromise, boolean ignoreRedirect, ConnectionManager connectionManager, @@ -59,7 +62,10 @@ public class RedisQueuedBatchExecutor extends BaseRedisBatchExecutor ConcurrentMap connections, BatchOptions options, AtomicInteger index, AtomicBoolean executed, AsyncSemaphore semaphore) { super(readOnlyMode, source, codec, command, params, mainPromise, ignoreRedirect, connectionManager, objectBuilder, - commands, connections, options, index, executed, semaphore); + commands, options, index, executed); + + this.connections = connections; + this.semaphore = semaphore; } @Override diff --git a/redisson/src/test/java/org/redisson/RedissonBatchTest.java b/redisson/src/test/java/org/redisson/RedissonBatchTest.java index 813d8d529..9802a982c 100644 --- a/redisson/src/test/java/org/redisson/RedissonBatchTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBatchTest.java @@ -381,6 +381,34 @@ public class RedissonBatchTest extends BaseTest { Assert.assertEquals(539, res.getResponses().size()); } + @Test + public void testBatchCancel() { + RedissonClient redisson = createInstance(); + + BatchOptions batchOptions = BatchOptions.defaults().executionMode(ExecutionMode.IN_MEMORY); + RBatch batch = redisson.createBatch(batchOptions); + for (int i = 0; i < 10; i++) { + RFuture f = batch.getBucket("test").setAsync(123); + assertThat(f.cancel(true)).isTrue(); + } + + BatchResult res = batch.execute(); + Assert.assertEquals(0, res.getResponses().size()); + + RBatch b2 = redisson.createBatch(batchOptions); + RListAsync listAsync2 = b2.getList("list"); + for (int i = 0; i < 6; i++) { + RFuture t = listAsync2.addAsync(i); + assertThat(t.cancel(true)).isTrue(); + } + + RFuture> res2 = b2.executeAsync(); + assertThat(res2.cancel(true)).isFalse(); + Assert.assertEquals(0, res.getResponses().size()); + + redisson.shutdown(); + } + @Test public void testBatchBigRequest() { Config config = createConfig();