|
|
@ -42,6 +42,7 @@ import org.redisson.client.RedisConnection;
|
|
|
|
import org.redisson.client.RedisException;
|
|
|
|
import org.redisson.client.RedisException;
|
|
|
|
import org.redisson.client.RedisLoadingException;
|
|
|
|
import org.redisson.client.RedisLoadingException;
|
|
|
|
import org.redisson.client.RedisMovedException;
|
|
|
|
import org.redisson.client.RedisMovedException;
|
|
|
|
|
|
|
|
import org.redisson.client.RedisRedirectException;
|
|
|
|
import org.redisson.client.RedisTimeoutException;
|
|
|
|
import org.redisson.client.RedisTimeoutException;
|
|
|
|
import org.redisson.client.RedisTryAgainException;
|
|
|
|
import org.redisson.client.RedisTryAgainException;
|
|
|
|
import org.redisson.client.WriteRedisConnectionException;
|
|
|
|
import org.redisson.client.WriteRedisConnectionException;
|
|
|
@ -62,6 +63,7 @@ import org.redisson.connection.NodeSource.Redirect;
|
|
|
|
import org.redisson.misc.LogHelper;
|
|
|
|
import org.redisson.misc.LogHelper;
|
|
|
|
import org.redisson.misc.RPromise;
|
|
|
|
import org.redisson.misc.RPromise;
|
|
|
|
import org.redisson.misc.RedissonObjectFactory;
|
|
|
|
import org.redisson.misc.RedissonObjectFactory;
|
|
|
|
|
|
|
|
import org.redisson.misc.RedissonPromise;
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
|
|
|
|
|
@ -181,36 +183,36 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public <T, R> RFuture<R> readAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) {
|
|
|
|
public <T, R> RFuture<R> readAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) {
|
|
|
|
RPromise<R> mainPromise = connectionManager.newPromise();
|
|
|
|
RPromise<R> mainPromise = new RedissonPromise<R>();
|
|
|
|
async(true, new NodeSource(entry, client), codec, command, params, mainPromise, 0);
|
|
|
|
async(true, new NodeSource(entry, client), codec, command, params, mainPromise, 0, false);
|
|
|
|
return mainPromise;
|
|
|
|
return mainPromise;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public <T, R> RFuture<R> readAsync(RedisClient client, String name, Codec codec, RedisCommand<T> command, Object... params) {
|
|
|
|
public <T, R> RFuture<R> readAsync(RedisClient client, String name, Codec codec, RedisCommand<T> command, Object... params) {
|
|
|
|
RPromise<R> mainPromise = connectionManager.newPromise();
|
|
|
|
RPromise<R> mainPromise = new RedissonPromise<R>();
|
|
|
|
int slot = connectionManager.calcSlot(name);
|
|
|
|
int slot = connectionManager.calcSlot(name);
|
|
|
|
async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0);
|
|
|
|
async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0, false);
|
|
|
|
return mainPromise;
|
|
|
|
return mainPromise;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public <T, R> RFuture<R> readAsync(RedisClient client, Codec codec, RedisCommand<T> command, Object... params) {
|
|
|
|
public <T, R> RFuture<R> readAsync(RedisClient client, Codec codec, RedisCommand<T> command, Object... params) {
|
|
|
|
RPromise<R> mainPromise = connectionManager.newPromise();
|
|
|
|
RPromise<R> mainPromise = new RedissonPromise<R>();
|
|
|
|
async(true, new NodeSource(client), codec, command, params, mainPromise, 0);
|
|
|
|
async(true, new NodeSource(client), codec, command, params, mainPromise, 0, false);
|
|
|
|
return mainPromise;
|
|
|
|
return mainPromise;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public <T, R> RFuture<Collection<R>> readAllAsync(RedisCommand<T> command, Object... params) {
|
|
|
|
public <T, R> RFuture<Collection<R>> readAllAsync(RedisCommand<T> command, Object... params) {
|
|
|
|
final RPromise<Collection<R>> mainPromise = connectionManager.newPromise();
|
|
|
|
final RPromise<Collection<R>> mainPromise = new RedissonPromise<Collection<R>>();
|
|
|
|
final Collection<MasterSlaveEntry> nodes = connectionManager.getEntrySet();
|
|
|
|
final Collection<MasterSlaveEntry> nodes = connectionManager.getEntrySet();
|
|
|
|
final List<R> results = new ArrayList<R>();
|
|
|
|
final List<R> results = new ArrayList<R>();
|
|
|
|
final AtomicInteger counter = new AtomicInteger(nodes.size());
|
|
|
|
final AtomicInteger counter = new AtomicInteger(nodes.size());
|
|
|
|
FutureListener<R> listener = new FutureListener<R>() {
|
|
|
|
FutureListener<R> listener = new FutureListener<R>() {
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public void operationComplete(Future<R> future) throws Exception {
|
|
|
|
public void operationComplete(Future<R> future) throws Exception {
|
|
|
|
if (!future.isSuccess()) {
|
|
|
|
if (!future.isSuccess() && !(future.cause() instanceof RedisRedirectException)) {
|
|
|
|
mainPromise.tryFailure(future.cause());
|
|
|
|
mainPromise.tryFailure(future.cause());
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -234,16 +236,16 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
};
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
for (MasterSlaveEntry entry : nodes) {
|
|
|
|
for (MasterSlaveEntry entry : nodes) {
|
|
|
|
RPromise<R> promise = connectionManager.newPromise();
|
|
|
|
RPromise<R> promise = new RedissonPromise<R>();
|
|
|
|
promise.addListener(listener);
|
|
|
|
promise.addListener(listener);
|
|
|
|
async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0);
|
|
|
|
async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0, true);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return mainPromise;
|
|
|
|
return mainPromise;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public <T, R> RFuture<R> readRandomAsync(RedisCommand<T> command, Object... params) {
|
|
|
|
public <T, R> RFuture<R> readRandomAsync(RedisCommand<T> command, Object... params) {
|
|
|
|
final RPromise<R> mainPromise = connectionManager.newPromise();
|
|
|
|
final RPromise<R> mainPromise = new RedissonPromise<R>();
|
|
|
|
final List<MasterSlaveEntry> nodes = new ArrayList<MasterSlaveEntry>(connectionManager.getEntrySet());
|
|
|
|
final List<MasterSlaveEntry> nodes = new ArrayList<MasterSlaveEntry>(connectionManager.getEntrySet());
|
|
|
|
Collections.shuffle(nodes);
|
|
|
|
Collections.shuffle(nodes);
|
|
|
|
|
|
|
|
|
|
|
@ -253,7 +255,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
|
|
|
|
|
|
|
|
private <R, T> void retryReadRandomAsync(final RedisCommand<T> command, final RPromise<R> mainPromise,
|
|
|
|
private <R, T> void retryReadRandomAsync(final RedisCommand<T> command, final RPromise<R> mainPromise,
|
|
|
|
final List<MasterSlaveEntry> nodes, final Object... params) {
|
|
|
|
final List<MasterSlaveEntry> nodes, final Object... params) {
|
|
|
|
final RPromise<R> attemptPromise = connectionManager.newPromise();
|
|
|
|
final RPromise<R> attemptPromise = new RedissonPromise<R>();
|
|
|
|
attemptPromise.addListener(new FutureListener<R>() {
|
|
|
|
attemptPromise.addListener(new FutureListener<R>() {
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public void operationComplete(Future<R> future) throws Exception {
|
|
|
|
public void operationComplete(Future<R> future) throws Exception {
|
|
|
@ -274,7 +276,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
});
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
MasterSlaveEntry entry = nodes.remove(0);
|
|
|
|
MasterSlaveEntry entry = nodes.remove(0);
|
|
|
|
async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, attemptPromise, 0);
|
|
|
|
async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, attemptPromise, 0, false);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
@ -293,19 +295,24 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private <T, R> RFuture<R> allAsync(boolean readOnlyMode, RedisCommand<T> command, final SlotCallback<T, R> callback, Object... params) {
|
|
|
|
private <T, R> RFuture<R> allAsync(boolean readOnlyMode, RedisCommand<T> command, final SlotCallback<T, R> callback, Object... params) {
|
|
|
|
final RPromise<R> mainPromise = connectionManager.newPromise();
|
|
|
|
final RPromise<R> mainPromise = new RedissonPromise<R>();
|
|
|
|
final Collection<MasterSlaveEntry> nodes = connectionManager.getEntrySet();
|
|
|
|
final Collection<MasterSlaveEntry> nodes = connectionManager.getEntrySet();
|
|
|
|
final AtomicInteger counter = new AtomicInteger(nodes.size());
|
|
|
|
final AtomicInteger counter = new AtomicInteger(nodes.size());
|
|
|
|
FutureListener<T> listener = new FutureListener<T>() {
|
|
|
|
FutureListener<T> listener = new FutureListener<T>() {
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public void operationComplete(Future<T> future) throws Exception {
|
|
|
|
public void operationComplete(Future<T> future) throws Exception {
|
|
|
|
if (!future.isSuccess()) {
|
|
|
|
if (!future.isSuccess() && !(future.cause() instanceof RedisRedirectException)) {
|
|
|
|
mainPromise.tryFailure(future.cause());
|
|
|
|
mainPromise.tryFailure(future.cause());
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
T result = future.getNow();
|
|
|
|
|
|
|
|
if (future.cause() instanceof RedisRedirectException) {
|
|
|
|
|
|
|
|
result = command.getConvertor().convert(result);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if (callback != null) {
|
|
|
|
if (callback != null) {
|
|
|
|
callback.onSlotResult(future.getNow());
|
|
|
|
callback.onSlotResult(result);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (counter.decrementAndGet() == 0) {
|
|
|
|
if (counter.decrementAndGet() == 0) {
|
|
|
|
if (callback != null) {
|
|
|
|
if (callback != null) {
|
|
|
@ -318,9 +325,9 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
};
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
for (MasterSlaveEntry entry : nodes) {
|
|
|
|
for (MasterSlaveEntry entry : nodes) {
|
|
|
|
RPromise<T> promise = connectionManager.newPromise();
|
|
|
|
RPromise<T> promise = new RedissonPromise<T>();
|
|
|
|
promise.addListener(listener);
|
|
|
|
promise.addListener(listener);
|
|
|
|
async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0);
|
|
|
|
async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0, true);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return mainPromise;
|
|
|
|
return mainPromise;
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -339,22 +346,22 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public <T, R> RFuture<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object... params) {
|
|
|
|
public <T, R> RFuture<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object... params) {
|
|
|
|
RPromise<R> mainPromise = connectionManager.newPromise();
|
|
|
|
RPromise<R> mainPromise = new RedissonPromise<R>();
|
|
|
|
NodeSource source = getNodeSource(key);
|
|
|
|
NodeSource source = getNodeSource(key);
|
|
|
|
async(true, source, codec, command, params, mainPromise, 0);
|
|
|
|
async(true, source, codec, command, params, mainPromise, 0, false);
|
|
|
|
return mainPromise;
|
|
|
|
return mainPromise;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
public <T, R> RFuture<R> readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) {
|
|
|
|
public <T, R> RFuture<R> readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) {
|
|
|
|
RPromise<R> mainPromise = connectionManager.newPromise();
|
|
|
|
RPromise<R> mainPromise = new RedissonPromise<R>();
|
|
|
|
async(true, new NodeSource(entry), codec, command, params, mainPromise, 0);
|
|
|
|
async(true, new NodeSource(entry), codec, command, params, mainPromise, 0, false);
|
|
|
|
return mainPromise;
|
|
|
|
return mainPromise;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public <T, R> RFuture<R> writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) {
|
|
|
|
public <T, R> RFuture<R> writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) {
|
|
|
|
RPromise<R> mainPromise = connectionManager.newPromise();
|
|
|
|
RPromise<R> mainPromise = new RedissonPromise<R>();
|
|
|
|
async(false, new NodeSource(entry), codec, command, params, mainPromise, 0);
|
|
|
|
async(false, new NodeSource(entry), codec, command, params, mainPromise, 0, false);
|
|
|
|
return mainPromise;
|
|
|
|
return mainPromise;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -396,14 +403,14 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
public <T, R> RFuture<R> evalAllAsync(boolean readOnlyMode, RedisCommand<T> command, final SlotCallback<T, R> callback, String script, List<Object> keys, Object... params) {
|
|
|
|
public <T, R> RFuture<R> evalAllAsync(boolean readOnlyMode, RedisCommand<T> command, final SlotCallback<T, R> callback, String script, List<Object> keys, Object... params) {
|
|
|
|
final RPromise<R> mainPromise = connectionManager.newPromise();
|
|
|
|
final RPromise<R> mainPromise = new RedissonPromise<R>();
|
|
|
|
final Collection<MasterSlaveEntry> entries = connectionManager.getEntrySet();
|
|
|
|
final Collection<MasterSlaveEntry> entries = connectionManager.getEntrySet();
|
|
|
|
final AtomicInteger counter = new AtomicInteger(entries.size());
|
|
|
|
final AtomicInteger counter = new AtomicInteger(entries.size());
|
|
|
|
FutureListener<T> listener = new FutureListener<T>() {
|
|
|
|
FutureListener<T> listener = new FutureListener<T>() {
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public void operationComplete(Future<T> future) throws Exception {
|
|
|
|
public void operationComplete(Future<T> future) throws Exception {
|
|
|
|
if (!future.isSuccess()) {
|
|
|
|
if (!future.isSuccess() && !(future.cause() instanceof RedisRedirectException)) {
|
|
|
|
mainPromise.tryFailure(future.cause());
|
|
|
|
mainPromise.tryFailure(future.cause());
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -422,21 +429,21 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
args.addAll(keys);
|
|
|
|
args.addAll(keys);
|
|
|
|
args.addAll(Arrays.asList(params));
|
|
|
|
args.addAll(Arrays.asList(params));
|
|
|
|
for (MasterSlaveEntry entry : entries) {
|
|
|
|
for (MasterSlaveEntry entry : entries) {
|
|
|
|
RPromise<T> promise = connectionManager.newPromise();
|
|
|
|
RPromise<T> promise = new RedissonPromise<T>();
|
|
|
|
promise.addListener(listener);
|
|
|
|
promise.addListener(listener);
|
|
|
|
async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, args.toArray(), promise, 0);
|
|
|
|
async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, args.toArray(), promise, 0, true);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return mainPromise;
|
|
|
|
return mainPromise;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private <T, R> RFuture<R> evalAsync(NodeSource nodeSource, boolean readOnlyMode, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
|
|
|
|
private <T, R> RFuture<R> evalAsync(NodeSource nodeSource, boolean readOnlyMode, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
|
|
|
|
RPromise<R> mainPromise = connectionManager.newPromise();
|
|
|
|
RPromise<R> mainPromise = new RedissonPromise<R>();
|
|
|
|
List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
|
|
|
|
List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
|
|
|
|
args.add(script);
|
|
|
|
args.add(script);
|
|
|
|
args.add(keys.size());
|
|
|
|
args.add(keys.size());
|
|
|
|
args.addAll(keys);
|
|
|
|
args.addAll(keys);
|
|
|
|
args.addAll(Arrays.asList(params));
|
|
|
|
args.addAll(Arrays.asList(params));
|
|
|
|
async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), mainPromise, 0);
|
|
|
|
async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), mainPromise, 0, false);
|
|
|
|
return mainPromise;
|
|
|
|
return mainPromise;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -447,14 +454,15 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public <T, R> RFuture<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object... params) {
|
|
|
|
public <T, R> RFuture<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object... params) {
|
|
|
|
RPromise<R> mainPromise = connectionManager.newPromise();
|
|
|
|
RPromise<R> mainPromise = new RedissonPromise<R>();
|
|
|
|
NodeSource source = getNodeSource(key);
|
|
|
|
NodeSource source = getNodeSource(key);
|
|
|
|
async(false, source, codec, command, params, mainPromise, 0);
|
|
|
|
async(false, source, codec, command, params, mainPromise, 0, false);
|
|
|
|
return mainPromise;
|
|
|
|
return mainPromise;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
protected <V, R> void async(final boolean readOnlyMode, final NodeSource source, final Codec codec,
|
|
|
|
protected <V, R> void async(final boolean readOnlyMode, final NodeSource source, final Codec codec,
|
|
|
|
final RedisCommand<V> command, final Object[] params, final RPromise<R> mainPromise, final int attempt) {
|
|
|
|
final RedisCommand<V> command, final Object[] params, final RPromise<R> mainPromise, final int attempt,
|
|
|
|
|
|
|
|
final boolean ignoreRedirect) {
|
|
|
|
if (mainPromise.isCancelled()) {
|
|
|
|
if (mainPromise.isCancelled()) {
|
|
|
|
free(params);
|
|
|
|
free(params);
|
|
|
|
return;
|
|
|
|
return;
|
|
|
@ -490,7 +498,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
connectionFuture = connectionManager.connectionWriteOp(source, command);
|
|
|
|
connectionFuture = connectionManager.connectionWriteOp(source, command);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
final RPromise<R> attemptPromise = connectionManager.newPromise();
|
|
|
|
final RPromise<R> attemptPromise = new RedissonPromise<R>();
|
|
|
|
details.init(connectionFuture, attemptPromise,
|
|
|
|
details.init(connectionFuture, attemptPromise,
|
|
|
|
readOnlyMode, source, codec, command, params, mainPromise, attempt);
|
|
|
|
readOnlyMode, source, codec, command, params, mainPromise, attempt);
|
|
|
|
|
|
|
|
|
|
|
@ -566,7 +574,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
count, details.getCommand(), Arrays.toString(details.getParams()));
|
|
|
|
count, details.getCommand(), Arrays.toString(details.getParams()));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
details.removeMainPromiseListener();
|
|
|
|
details.removeMainPromiseListener();
|
|
|
|
async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count);
|
|
|
|
async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count, ignoreRedirect);
|
|
|
|
AsyncDetails.release(details);
|
|
|
|
AsyncDetails.release(details);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -597,10 +605,10 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
final RedisConnection connection = connFuture.getNow();
|
|
|
|
final RedisConnection connection = connFuture.getNow();
|
|
|
|
if (details.getSource().getRedirect() == Redirect.ASK) {
|
|
|
|
if (details.getSource().getRedirect() == Redirect.ASK) {
|
|
|
|
List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(2);
|
|
|
|
List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(2);
|
|
|
|
RPromise<Void> promise = connectionManager.newPromise();
|
|
|
|
RPromise<Void> promise = new RedissonPromise<Void>();
|
|
|
|
list.add(new CommandData<Void, Void>(promise, details.getCodec(), RedisCommands.ASKING, new Object[]{}));
|
|
|
|
list.add(new CommandData<Void, Void>(promise, details.getCodec(), RedisCommands.ASKING, new Object[]{}));
|
|
|
|
list.add(new CommandData<V, R>(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams()));
|
|
|
|
list.add(new CommandData<V, R>(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams()));
|
|
|
|
RPromise<Void> main = connectionManager.newPromise();
|
|
|
|
RPromise<Void> main = new RedissonPromise<Void>();
|
|
|
|
ChannelFuture future = connection.send(new CommandsData(main, list));
|
|
|
|
ChannelFuture future = connection.send(new CommandsData(main, list));
|
|
|
|
details.setWriteFuture(future);
|
|
|
|
details.setWriteFuture(future);
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
@ -626,7 +634,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
attemptPromise.addListener(new FutureListener<R>() {
|
|
|
|
attemptPromise.addListener(new FutureListener<R>() {
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public void operationComplete(Future<R> future) throws Exception {
|
|
|
|
public void operationComplete(Future<R> future) throws Exception {
|
|
|
|
checkAttemptFuture(source, details, future);
|
|
|
|
checkAttemptFuture(source, details, future, ignoreRedirect);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -780,7 +788,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private <R, V> void checkAttemptFuture(final NodeSource source, final AsyncDetails<V, R> details,
|
|
|
|
private <R, V> void checkAttemptFuture(final NodeSource source, final AsyncDetails<V, R> details,
|
|
|
|
Future<R> future) {
|
|
|
|
Future<R> future, boolean ignoreRedirect) {
|
|
|
|
details.getTimeout().cancel();
|
|
|
|
details.getTimeout().cancel();
|
|
|
|
if (future.isCancelled()) {
|
|
|
|
if (future.isCancelled()) {
|
|
|
|
return;
|
|
|
|
return;
|
|
|
@ -788,25 +796,25 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
|
|
|
|
|
|
|
|
details.removeMainPromiseListener();
|
|
|
|
details.removeMainPromiseListener();
|
|
|
|
|
|
|
|
|
|
|
|
if (future.cause() instanceof RedisMovedException) {
|
|
|
|
if (future.cause() instanceof RedisMovedException && !ignoreRedirect) {
|
|
|
|
RedisMovedException ex = (RedisMovedException) future.cause();
|
|
|
|
RedisMovedException ex = (RedisMovedException) future.cause();
|
|
|
|
async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.MOVED), details.getCodec(),
|
|
|
|
async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.MOVED), details.getCodec(),
|
|
|
|
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt());
|
|
|
|
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect);
|
|
|
|
AsyncDetails.release(details);
|
|
|
|
AsyncDetails.release(details);
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if (future.cause() instanceof RedisAskException) {
|
|
|
|
if (future.cause() instanceof RedisAskException && !ignoreRedirect) {
|
|
|
|
RedisAskException ex = (RedisAskException) future.cause();
|
|
|
|
RedisAskException ex = (RedisAskException) future.cause();
|
|
|
|
async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.ASK), details.getCodec(),
|
|
|
|
async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.ASK), details.getCodec(),
|
|
|
|
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt());
|
|
|
|
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect);
|
|
|
|
AsyncDetails.release(details);
|
|
|
|
AsyncDetails.release(details);
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if (future.cause() instanceof RedisLoadingException) {
|
|
|
|
if (future.cause() instanceof RedisLoadingException) {
|
|
|
|
async(details.isReadOnlyMode(), source, details.getCodec(),
|
|
|
|
async(details.isReadOnlyMode(), source, details.getCodec(),
|
|
|
|
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt());
|
|
|
|
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect);
|
|
|
|
AsyncDetails.release(details);
|
|
|
|
AsyncDetails.release(details);
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -816,7 +824,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public void run(Timeout timeout) throws Exception {
|
|
|
|
public void run(Timeout timeout) throws Exception {
|
|
|
|
async(details.isReadOnlyMode(), source, details.getCodec(),
|
|
|
|
async(details.isReadOnlyMode(), source, details.getCodec(),
|
|
|
|
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt());
|
|
|
|
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect);
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}, 1, TimeUnit.SECONDS);
|
|
|
|
}, 1, TimeUnit.SECONDS);
|
|
|
|