cluster commands handling regression bug fixed

pull/555/head
Nikita 9 years ago
parent f70816254a
commit b7254265cc

@ -133,14 +133,13 @@ public class CommandAsyncService implements CommandAsyncExecutor {
public <T, R> Future<Collection<R>> readAllAsync(RedisCommand<T> command, Object ... params) { public <T, R> Future<Collection<R>> readAllAsync(RedisCommand<T> command, Object ... params) {
final Promise<Collection<R>> mainPromise = connectionManager.newPromise(); final Promise<Collection<R>> mainPromise = connectionManager.newPromise();
final Set<MasterSlaveEntry> nodes = connectionManager.getEntrySet(); final Set<MasterSlaveEntry> nodes = connectionManager.getEntrySet();
Promise<R> promise = connectionManager.newPromise();
final List<R> results = new ArrayList<R>(); final List<R> results = new ArrayList<R>();
final AtomicInteger counter = new AtomicInteger(nodes.size()); final AtomicInteger counter = new AtomicInteger(nodes.size());
promise.addListener(new FutureListener<R>() { FutureListener<R> listener = new FutureListener<R>() {
@Override @Override
public void operationComplete(Future<R> future) throws Exception { public void operationComplete(Future<R> future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
mainPromise.setFailure(future.cause()); mainPromise.tryFailure(future.cause());
return; return;
} }
@ -160,9 +159,11 @@ public class CommandAsyncService implements CommandAsyncExecutor {
mainPromise.setSuccess(results); mainPromise.setSuccess(results);
} }
} }
}); };
for (MasterSlaveEntry entry : nodes) { for (MasterSlaveEntry entry : nodes) {
Promise<R> promise = connectionManager.newPromise();
promise.addListener(listener);
async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0); async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0);
} }
return mainPromise; return mainPromise;
@ -222,13 +223,12 @@ public class CommandAsyncService implements CommandAsyncExecutor {
private <T, R> Future<R> allAsync(boolean readOnlyMode, RedisCommand<T> command, final SlotCallback<T, R> callback, Object ... params) { private <T, R> Future<R> allAsync(boolean readOnlyMode, RedisCommand<T> command, final SlotCallback<T, R> callback, Object ... params) {
final Promise<R> mainPromise = connectionManager.newPromise(); final Promise<R> mainPromise = connectionManager.newPromise();
final Set<MasterSlaveEntry> nodes = connectionManager.getEntrySet(); final Set<MasterSlaveEntry> nodes = connectionManager.getEntrySet();
Promise<T> promise = connectionManager.newPromise();
final AtomicInteger counter = new AtomicInteger(nodes.size()); final AtomicInteger counter = new AtomicInteger(nodes.size());
promise.addListener(new FutureListener<T>() { FutureListener<T> listener = new FutureListener<T>() {
@Override @Override
public void operationComplete(Future<T> future) throws Exception { public void operationComplete(Future<T> future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
mainPromise.setFailure(future.cause()); mainPromise.tryFailure(future.cause());
return; return;
} }
@ -243,9 +243,11 @@ public class CommandAsyncService implements CommandAsyncExecutor {
} }
} }
} }
}); };
for (MasterSlaveEntry entry : nodes) { for (MasterSlaveEntry entry : nodes) {
Promise<T> promise = connectionManager.newPromise();
promise.addListener(listener);
async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0); async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0);
} }
return mainPromise; return mainPromise;
@ -348,14 +350,13 @@ public class CommandAsyncService implements CommandAsyncExecutor {
public <T, R> Future<R> evalAllAsync(boolean readOnlyMode, RedisCommand<T> command, final SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params) { public <T, R> Future<R> evalAllAsync(boolean readOnlyMode, RedisCommand<T> command, final SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params) {
final Promise<R> mainPromise = connectionManager.newPromise(); final Promise<R> mainPromise = connectionManager.newPromise();
final Set<MasterSlaveEntry> entries = connectionManager.getEntrySet(); final Set<MasterSlaveEntry> entries = connectionManager.getEntrySet();
Promise<T> promise = connectionManager.newPromise();
final AtomicInteger counter = new AtomicInteger(entries.size()); final AtomicInteger counter = new AtomicInteger(entries.size());
promise.addListener(new FutureListener<T>() { FutureListener<T> listener = new FutureListener<T>() {
@Override @Override
public void operationComplete(Future<T> future) throws Exception { public void operationComplete(Future<T> future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
mainPromise.setFailure(future.cause()); mainPromise.tryFailure(future.cause());
return; return;
} }
@ -365,7 +366,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
mainPromise.setSuccess(callback.onFinish()); mainPromise.setSuccess(callback.onFinish());
} }
} }
}); };
List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length); List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
args.add(script); args.add(script);
@ -373,6 +374,8 @@ public class CommandAsyncService implements CommandAsyncExecutor {
args.addAll(keys); args.addAll(keys);
args.addAll(Arrays.asList(params)); args.addAll(Arrays.asList(params));
for (MasterSlaveEntry entry : entries) { for (MasterSlaveEntry entry : entries) {
Promise<T> promise = connectionManager.newPromise();
promise.addListener(listener);
async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, args.toArray(), promise, 0); async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, args.toArray(), promise, 0);
} }
return mainPromise; return mainPromise;

Loading…
Cancel
Save