|
|
|
@ -103,15 +103,41 @@ public class CommandExecutorService implements CommandExecutor {
|
|
|
|
|
return mainPromise;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public <T, R> Future<R> readRandomAsync(RedisCommand<T> command, Codec codec, Object ... params) {
|
|
|
|
|
public <T, R> Future<R> readRandomAsync(final RedisCommand<T> command, final Object ... params) {
|
|
|
|
|
final Promise<R> mainPromise = connectionManager.newPromise();
|
|
|
|
|
List<Integer> slots = new ArrayList<Integer>(connectionManager.getEntries().keySet());
|
|
|
|
|
final List<Integer> slots = new ArrayList<Integer>(connectionManager.getEntries().keySet());
|
|
|
|
|
Collections.shuffle(slots);
|
|
|
|
|
Integer slot = slots.get(0);
|
|
|
|
|
async(true, slot, null, codec, command, params, mainPromise, 0);
|
|
|
|
|
|
|
|
|
|
retryReadRandomAsync(command, mainPromise, slots, params);
|
|
|
|
|
return mainPromise;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private <R, T> void retryReadRandomAsync(final RedisCommand<T> command, final Promise<R> mainPromise,
|
|
|
|
|
final List<Integer> slots, final Object... params) {
|
|
|
|
|
final Promise<R> attemptPromise = connectionManager.newPromise();
|
|
|
|
|
attemptPromise.addListener(new FutureListener<R>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<R> future) throws Exception {
|
|
|
|
|
if (future.isSuccess()) {
|
|
|
|
|
if (future.getNow() == null) {
|
|
|
|
|
if (slots.isEmpty()) {
|
|
|
|
|
mainPromise.setSuccess(null);
|
|
|
|
|
} else {
|
|
|
|
|
retryReadRandomAsync(command, mainPromise, slots, params);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
mainPromise.setSuccess(future.getNow());
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
mainPromise.setFailure(future.cause());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
Integer slot = slots.remove(0);
|
|
|
|
|
async(true, slot, null, connectionManager.getCodec(), command, params, attemptPromise, 0);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public <T> Future<Void> writeAllAsync(RedisCommand<T> command, Object ... params) {
|
|
|
|
|
return writeAllAsync(command, null, params);
|
|
|
|
|
}
|
|
|
|
@ -177,14 +203,14 @@ public class CommandExecutorService implements CommandExecutor {
|
|
|
|
|
return mainPromise;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public <T, R> R read(Integer slot, Codec codec, RedisCommand<T> command, Object ... params) {
|
|
|
|
|
Future<R> res = readAsync(slot, codec, command, params);
|
|
|
|
|
public <T, R> R write(Integer slot, Codec codec, RedisCommand<T> command, Object ... params) {
|
|
|
|
|
Future<R> res = writeAsync(slot, codec, command, params);
|
|
|
|
|
return get(res);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public <T, R> Future<R> readAsync(Integer slot, Codec codec, RedisCommand<T> command, Object ... params) {
|
|
|
|
|
public <T, R> Future<R> writeAsync(Integer slot, Codec codec, RedisCommand<T> command, Object ... params) {
|
|
|
|
|
Promise<R> mainPromise = connectionManager.newPromise();
|
|
|
|
|
async(true, slot, null, codec, command, params, mainPromise, 0);
|
|
|
|
|
async(false, slot, null, codec, command, params, mainPromise, 0);
|
|
|
|
|
return mainPromise;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|