From 1133f4a1dc4631a04c31415d46296552543f199a Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 13 Jun 2018 14:56:01 +0300 Subject: [PATCH] Fixed - error handling during RBatch execution in executionMode = REDIS_WRITE_ATOMIC/REDIS_READ_ATOMIC --- .../redisson/command/CommandBatchService.java | 60 ++++++++++--------- .../java/org/redisson/RedissonBatchTest.java | 13 +++- 2 files changed, 42 insertions(+), 31 deletions(-) diff --git a/redisson/src/main/java/org/redisson/command/CommandBatchService.java b/redisson/src/main/java/org/redisson/command/CommandBatchService.java index 4199aaf37..1c4961eb2 100644 --- a/redisson/src/main/java/org/redisson/command/CommandBatchService.java +++ b/redisson/src/main/java/org/redisson/command/CommandBatchService.java @@ -229,6 +229,7 @@ public class CommandBatchService extends CommandAsyncService { BatchPromise batchPromise = (BatchPromise) promise; RPromise sentPromise = (RPromise) batchPromise.getSentPromise(); super.handleError(sentPromise, cause); + super.handleError(promise, cause); semaphore.release(); return; } @@ -462,39 +463,42 @@ public class CommandBatchService extends CommandAsyncService { return; } - for (java.util.Map.Entry> entry : future.getNow().entrySet()) { - Entry commandEntry = commands.get(entry.getKey()); - Iterator resultIter = entry.getValue().iterator(); - for (BatchCommandData data : commandEntry.getCommands()) { - if (data.getCommand().getName().equals(RedisCommands.EXEC.getName())) { - break; + try { + for (java.util.Map.Entry> entry : future.getNow().entrySet()) { + Entry commandEntry = commands.get(entry.getKey()); + Iterator resultIter = entry.getValue().iterator(); + for (BatchCommandData data : commandEntry.getCommands()) { + if (data.getCommand().getName().equals(RedisCommands.EXEC.getName())) { + break; + } + RPromise promise = (RPromise) data.getPromise(); + promise.trySuccess(resultIter.next()); } - RPromise promise = (RPromise) data.getPromise(); - promise.trySuccess(resultIter.next()); } - } - - List entries = new ArrayList(); - for (Entry e : commands.values()) { - entries.addAll(e.getCommands()); - } - Collections.sort(entries); - List responses = new ArrayList(entries.size()); - int syncedSlaves = 0; - for (BatchCommandData commandEntry : entries) { - if (isWaitCommand(commandEntry)) { - syncedSlaves += (Integer) commandEntry.getPromise().getNow(); - } else if (!commandEntry.getCommand().getName().equals(RedisCommands.MULTI.getName()) - && !commandEntry.getCommand().getName().equals(RedisCommands.EXEC.getName())) { - Object entryResult = commandEntry.getPromise().getNow(); - entryResult = tryHandleReference(entryResult); - responses.add(entryResult); + + List entries = new ArrayList(); + for (Entry e : commands.values()) { + entries.addAll(e.getCommands()); } + Collections.sort(entries); + List responses = new ArrayList(entries.size()); + int syncedSlaves = 0; + for (BatchCommandData commandEntry : entries) { + if (isWaitCommand(commandEntry)) { + syncedSlaves += (Integer) commandEntry.getPromise().getNow(); + } else if (!commandEntry.getCommand().getName().equals(RedisCommands.MULTI.getName()) + && !commandEntry.getCommand().getName().equals(RedisCommands.EXEC.getName())) { + Object entryResult = commandEntry.getPromise().getNow(); + entryResult = tryHandleReference(entryResult); + responses.add(entryResult); + } + } + BatchResult result = new BatchResult(responses, syncedSlaves); + resultPromise.trySuccess((R)result); + } catch (Exception e) { + resultPromise.tryFailure(e); } - BatchResult result = new BatchResult(responses, syncedSlaves); - resultPromise.trySuccess((R)result); - commands = null; } }); diff --git a/redisson/src/test/java/org/redisson/RedissonBatchTest.java b/redisson/src/test/java/org/redisson/RedissonBatchTest.java index be1530de8..58cbf99b3 100644 --- a/redisson/src/test/java/org/redisson/RedissonBatchTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBatchTest.java @@ -3,6 +3,7 @@ package org.redisson; import static org.assertj.core.api.Assertions.assertThat; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -28,6 +29,7 @@ import org.redisson.api.RBatch; import org.redisson.api.RFuture; import org.redisson.api.RListAsync; import org.redisson.api.RMapAsync; +import org.redisson.api.RMapCache; import org.redisson.api.RMapCacheAsync; import org.redisson.api.RScript; import org.redisson.api.RScript.Mode; @@ -135,15 +137,20 @@ public class RedissonBatchTest extends BaseTest { process.shutdown(); } - + @Test public void testWriteTimeout() { RBatch batch = redisson.createBatch(batchOptions); + RMapCacheAsync map = batch.getMapCache("test"); for (int i = 0; i < 200000; i++) { - RMapCacheAsync map = batch.getMapCache("test"); - map.putAsync("" + i, "" + i, 10, TimeUnit.SECONDS); + RFuture f = map.putAsync("" + i, "" + i, 5, TimeUnit.MINUTES); + if (batchOptions.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC) { + f.syncUninterruptibly(); + } } + batch.execute(); + assertThat(redisson.getMapCache("test").size()).isEqualTo(200000); } @Test