|
|
@ -21,6 +21,7 @@ import java.util.Arrays;
|
|
|
|
import java.util.Collection;
|
|
|
|
import java.util.Collection;
|
|
|
|
import java.util.Collections;
|
|
|
|
import java.util.Collections;
|
|
|
|
import java.util.List;
|
|
|
|
import java.util.List;
|
|
|
|
|
|
|
|
import java.util.Set;
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
|
|
|
|
|
@ -181,10 +182,11 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
return allAsync(true, command, callback, params);
|
|
|
|
return allAsync(true, command, callback, params);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
public <T, R> Future<R> allAsync(boolean readOnlyMode, RedisCommand<T> command, final SlotCallback<T, R> callback, Object ... params) {
|
|
|
|
private <T, R> Future<R> allAsync(boolean readOnlyMode, RedisCommand<T> command, final SlotCallback<T, R> callback, Object ... params) {
|
|
|
|
final Promise<R> mainPromise = connectionManager.newPromise();
|
|
|
|
final Promise<R> mainPromise = connectionManager.newPromise();
|
|
|
|
|
|
|
|
final Set<ClusterSlotRange> slots = connectionManager.getEntries().keySet();
|
|
|
|
Promise<T> promise = new DefaultPromise<T>() {
|
|
|
|
Promise<T> promise = new DefaultPromise<T>() {
|
|
|
|
AtomicInteger counter = new AtomicInteger(connectionManager.getEntries().keySet().size());
|
|
|
|
AtomicInteger counter = new AtomicInteger(slots.size());
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public Promise<T> setSuccess(T result) {
|
|
|
|
public Promise<T> setSuccess(T result) {
|
|
|
|
if (callback != null) {
|
|
|
|
if (callback != null) {
|
|
|
@ -206,7 +208,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
return this;
|
|
|
|
return this;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
};
|
|
|
|
for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) {
|
|
|
|
for (ClusterSlotRange slot : slots) {
|
|
|
|
async(readOnlyMode, new NodeSource(slot.getStartSlot()), connectionManager.getCodec(), command, params, promise, 0);
|
|
|
|
async(readOnlyMode, new NodeSource(slot.getStartSlot()), connectionManager.getCodec(), command, params, promise, 0);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return mainPromise;
|
|
|
|
return mainPromise;
|
|
|
|