From b7254265ccba4f6b34f19b54f72744176e038d0b Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 13 Jul 2016 19:58:15 +0300 Subject: [PATCH] cluster commands handling regression bug fixed --- .../redisson/command/CommandAsyncService.java | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/redisson/command/CommandAsyncService.java b/src/main/java/org/redisson/command/CommandAsyncService.java index bcac77fb5..0f73d4d63 100644 --- a/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/src/main/java/org/redisson/command/CommandAsyncService.java @@ -133,14 +133,13 @@ public class CommandAsyncService implements CommandAsyncExecutor { public Future> readAllAsync(RedisCommand command, Object ... params) { final Promise> mainPromise = connectionManager.newPromise(); final Set nodes = connectionManager.getEntrySet(); - Promise promise = connectionManager.newPromise(); final List results = new ArrayList(); final AtomicInteger counter = new AtomicInteger(nodes.size()); - promise.addListener(new FutureListener() { + FutureListener listener = new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { - mainPromise.setFailure(future.cause()); + mainPromise.tryFailure(future.cause()); return; } @@ -160,9 +159,11 @@ public class CommandAsyncService implements CommandAsyncExecutor { mainPromise.setSuccess(results); } } - }); + }; for (MasterSlaveEntry entry : nodes) { + Promise promise = connectionManager.newPromise(); + promise.addListener(listener); async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0); } return mainPromise; @@ -222,13 +223,12 @@ public class CommandAsyncService implements CommandAsyncExecutor { private Future allAsync(boolean readOnlyMode, RedisCommand command, final SlotCallback callback, Object ... params) { final Promise mainPromise = connectionManager.newPromise(); final Set nodes = connectionManager.getEntrySet(); - Promise promise = connectionManager.newPromise(); final AtomicInteger counter = new AtomicInteger(nodes.size()); - promise.addListener(new FutureListener() { + FutureListener listener = new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { - mainPromise.setFailure(future.cause()); + mainPromise.tryFailure(future.cause()); return; } @@ -243,9 +243,11 @@ public class CommandAsyncService implements CommandAsyncExecutor { } } } - }); + }; for (MasterSlaveEntry entry : nodes) { + Promise promise = connectionManager.newPromise(); + promise.addListener(listener); async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0); } return mainPromise; @@ -348,14 +350,13 @@ public class CommandAsyncService implements CommandAsyncExecutor { public Future evalAllAsync(boolean readOnlyMode, RedisCommand command, final SlotCallback callback, String script, List keys, Object ... params) { final Promise mainPromise = connectionManager.newPromise(); final Set entries = connectionManager.getEntrySet(); - Promise promise = connectionManager.newPromise(); final AtomicInteger counter = new AtomicInteger(entries.size()); - promise.addListener(new FutureListener() { + FutureListener listener = new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { - mainPromise.setFailure(future.cause()); + mainPromise.tryFailure(future.cause()); return; } @@ -365,7 +366,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { mainPromise.setSuccess(callback.onFinish()); } } - }); + }; List args = new ArrayList(2 + keys.size() + params.length); args.add(script); @@ -373,6 +374,8 @@ public class CommandAsyncService implements CommandAsyncExecutor { args.addAll(keys); args.addAll(Arrays.asList(params)); for (MasterSlaveEntry entry : entries) { + Promise promise = connectionManager.newPromise(); + promise.addListener(listener); async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, args.toArray(), promise, 0); } return mainPromise;