|
|
|
@ -98,7 +98,7 @@ public class CommandBatchService extends CommandAsyncService {
|
|
|
|
|
|
|
|
|
|
public static class Entry {
|
|
|
|
|
|
|
|
|
|
Deque<BatchCommandData<?, ?>> commands = new LinkedBlockingDeque<BatchCommandData<?,?>>();
|
|
|
|
|
Deque<BatchCommandData<?, ?>> commands = new LinkedBlockingDeque<>();
|
|
|
|
|
volatile boolean readOnlyMode = true;
|
|
|
|
|
|
|
|
|
|
public Deque<BatchCommandData<?, ?>> getCommands() {
|
|
|
|
@ -122,7 +122,7 @@ public class CommandBatchService extends CommandAsyncService {
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private final AtomicInteger index = new AtomicInteger();
|
|
|
|
|
private AtomicInteger index = new AtomicInteger();
|
|
|
|
|
|
|
|
|
|
private ConcurrentMap<MasterSlaveEntry, Entry> commands = PlatformDependent.newConcurrentHashMap();
|
|
|
|
|
private ConcurrentMap<MasterSlaveEntry, ConnectionEntry> connections = PlatformDependent.newConcurrentHashMap();
|
|
|
|
@ -206,7 +206,7 @@ public class CommandBatchService extends CommandAsyncService {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
protected <V, R> void handleSuccess(final AsyncDetails<V, R> details, RPromise<R> promise, RedisCommand<?> command, R res) {
|
|
|
|
|
protected <V, R> void handleSuccess(AsyncDetails<V, R> details, RPromise<R> promise, RedisCommand<?> command, R res) {
|
|
|
|
|
if (RedisCommands.EXEC.getName().equals(command.getName())) {
|
|
|
|
|
super.handleSuccess(details, promise, command, res);
|
|
|
|
|
return;
|
|
|
|
@ -230,7 +230,7 @@ public class CommandBatchService extends CommandAsyncService {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
protected <V, R> void handleError(final AsyncDetails<V, R> details, RPromise<R> promise, Throwable cause) {
|
|
|
|
|
protected <V, R> void handleError(AsyncDetails<V, R> details, RPromise<R> promise, Throwable cause) {
|
|
|
|
|
if (isRedisBasedQueue() && promise instanceof BatchPromise) {
|
|
|
|
|
BatchPromise<R> batchPromise = (BatchPromise<R>) promise;
|
|
|
|
|
RPromise<R> sentPromise = (RPromise<R>) batchPromise.getSentPromise();
|
|
|
|
@ -368,7 +368,7 @@ public class CommandBatchService extends CommandAsyncService {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public RFuture<Void> executeAsyncVoid() {
|
|
|
|
|
final RedissonPromise<Void> promise = new RedissonPromise<Void>();
|
|
|
|
|
RedissonPromise<Void> promise = new RedissonPromise<Void>();
|
|
|
|
|
RFuture<BatchResult<?>> resFuture = executeAsync(BatchOptions.defaults());
|
|
|
|
|
resFuture.onComplete((res, e) -> {
|
|
|
|
|
if (e == null) {
|
|
|
|
@ -384,6 +384,7 @@ public class CommandBatchService extends CommandAsyncService {
|
|
|
|
|
return executeAsync(BatchOptions.defaults());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@SuppressWarnings("MethodLength")
|
|
|
|
|
public <R> RFuture<R> executeAsync(BatchOptions options) {
|
|
|
|
|
if (executed.get()) {
|
|
|
|
|
throw new IllegalStateException("Batch already executed!");
|
|
|
|
@ -403,7 +404,7 @@ public class CommandBatchService extends CommandAsyncService {
|
|
|
|
|
int permits = 0;
|
|
|
|
|
for (Entry entry : commands.values()) {
|
|
|
|
|
permits += entry.getCommands().size();
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
RPromise<R> resultPromise = new RedissonPromise<R>();
|
|
|
|
|
semaphore.acquire(new Runnable() {
|
|
|
|
@ -491,7 +492,7 @@ public class CommandBatchService extends CommandAsyncService {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
BatchResult<Object> r = new BatchResult<Object>(responses, syncedSlaves);
|
|
|
|
|
resultPromise.trySuccess((R)r);
|
|
|
|
|
resultPromise.trySuccess((R) r);
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
resultPromise.tryFailure(e);
|
|
|
|
|
}
|
|
|
|
@ -530,7 +531,7 @@ public class CommandBatchService extends CommandAsyncService {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
RPromise<R> resultPromise;
|
|
|
|
|
final RPromise<Void> voidPromise = new RedissonPromise<Void>();
|
|
|
|
|
RPromise<Void> voidPromise = new RedissonPromise<Void>();
|
|
|
|
|
if (this.options.isSkipResult()) {
|
|
|
|
|
voidPromise.onComplete((res, e) -> {
|
|
|
|
|
// commands = null;
|
|
|
|
@ -539,7 +540,7 @@ public class CommandBatchService extends CommandAsyncService {
|
|
|
|
|
});
|
|
|
|
|
resultPromise = (RPromise<R>) voidPromise;
|
|
|
|
|
} else {
|
|
|
|
|
final RPromise<Object> promise = new RedissonPromise<Object>();
|
|
|
|
|
RPromise<Object> promise = new RedissonPromise<Object>();
|
|
|
|
|
voidPromise.onComplete((res, ex) -> {
|
|
|
|
|
executed.set(true);
|
|
|
|
|
if (ex != null) {
|
|
|
|
@ -576,7 +577,7 @@ public class CommandBatchService extends CommandAsyncService {
|
|
|
|
|
resultPromise = (RPromise<R>) promise;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
final AtomicInteger slots = new AtomicInteger(commands.size());
|
|
|
|
|
AtomicInteger slots = new AtomicInteger(commands.size());
|
|
|
|
|
|
|
|
|
|
for (java.util.Map.Entry<RFuture<?>, List<CommandBatchService>> entry : nestedServices.entrySet()) {
|
|
|
|
|
slots.incrementAndGet();
|
|
|
|
@ -599,8 +600,9 @@ public class CommandBatchService extends CommandAsyncService {
|
|
|
|
|
return options != null && (this.options.getExecutionMode() == ExecutionMode.REDIS_READ_ATOMIC || this.options.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void execute(final Entry entry, final NodeSource source, final RPromise<Void> mainPromise, final AtomicInteger slots,
|
|
|
|
|
final int attempt, final BatchOptions options) {
|
|
|
|
|
@SuppressWarnings({"MethodLength", "NestedIfDepth"})
|
|
|
|
|
private void execute(Entry entry, NodeSource source, RPromise<Void> mainPromise, AtomicInteger slots,
|
|
|
|
|
int attempt, BatchOptions options) {
|
|
|
|
|
if (mainPromise.isCancelled()) {
|
|
|
|
|
free(entry);
|
|
|
|
|
return;
|
|
|
|
@ -612,9 +614,9 @@ public class CommandBatchService extends CommandAsyncService {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
final RPromise<Void> attemptPromise = new RedissonPromise<Void>();
|
|
|
|
|
RPromise<Void> attemptPromise = new RedissonPromise<Void>();
|
|
|
|
|
|
|
|
|
|
final AsyncDetails details = new AsyncDetails();
|
|
|
|
|
AsyncDetails details = new AsyncDetails();
|
|
|
|
|
details.init(null, attemptPromise,
|
|
|
|
|
entry.isReadOnlyMode(), source, null, null, null, mainPromise, attempt);
|
|
|
|
|
|
|
|
|
@ -654,7 +656,7 @@ public class CommandBatchService extends CommandAsyncService {
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
final TimerTask retryTimerTask = new TimerTask() {
|
|
|
|
|
TimerTask retryTimerTask = new TimerTask() {
|
|
|
|
|
@Override
|
|
|
|
|
public void run(Timeout t) throws Exception {
|
|
|
|
|
if (attemptPromise.isDone()) {
|
|
|
|
@ -732,14 +734,14 @@ public class CommandBatchService extends CommandAsyncService {
|
|
|
|
|
skip.set(true);
|
|
|
|
|
|
|
|
|
|
if (e instanceof RedisMovedException) {
|
|
|
|
|
RedisMovedException ex = (RedisMovedException)e;
|
|
|
|
|
RedisMovedException ex = (RedisMovedException) e;
|
|
|
|
|
entry.clearErrors();
|
|
|
|
|
NodeSource nodeSource = new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.MOVED);
|
|
|
|
|
execute(entry, nodeSource, mainPromise, slots, attempt, options);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
if (e instanceof RedisAskException) {
|
|
|
|
|
RedisAskException ex = (RedisAskException)e;
|
|
|
|
|
RedisAskException ex = (RedisAskException) e;
|
|
|
|
|
entry.clearErrors();
|
|
|
|
|
NodeSource nodeSource = new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.ASK);
|
|
|
|
|
execute(entry, nodeSource, mainPromise, slots, attempt, options);
|
|
|
|
@ -765,14 +767,14 @@ public class CommandBatchService extends CommandAsyncService {
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected void free(final Entry entry) {
|
|
|
|
|
protected void free(Entry entry) {
|
|
|
|
|
for (BatchCommandData<?, ?> command : entry.getCommands()) {
|
|
|
|
|
free(command.getParams());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void checkWriteFuture(final Entry entry, final RPromise<Void> attemptPromise, final AsyncDetails details,
|
|
|
|
|
final RedisConnection connection, ChannelFuture future, long responseTimeout, int attempts, final AtomicInteger slots, final RPromise<Void> mainPromise) {
|
|
|
|
|
private void checkWriteFuture(Entry entry, RPromise<Void> attemptPromise, AsyncDetails details,
|
|
|
|
|
RedisConnection connection, ChannelFuture future, long responseTimeout, int attempts, AtomicInteger slots, RPromise<Void> mainPromise) {
|
|
|
|
|
if (future.isCancelled() || attemptPromise.isDone()) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
@ -820,10 +822,11 @@ public class CommandBatchService extends CommandAsyncService {
|
|
|
|
|
details.setTimeout(timeoutTask);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void checkConnectionFuture(final Entry entry, final NodeSource source,
|
|
|
|
|
final RPromise<Void> mainPromise, final RPromise<Void> attemptPromise, final AsyncDetails details,
|
|
|
|
|
RFuture<RedisConnection> connFuture, final boolean noResult, final long responseTimeout, final int attempts,
|
|
|
|
|
ExecutionMode executionMode, final AtomicInteger slots) {
|
|
|
|
|
@SuppressWarnings("ParameterNumber")
|
|
|
|
|
private void checkConnectionFuture(Entry entry, NodeSource source,
|
|
|
|
|
RPromise<Void> mainPromise, RPromise<Void> attemptPromise, AsyncDetails details,
|
|
|
|
|
RFuture<RedisConnection> connFuture, boolean noResult, long responseTimeout, int attempts,
|
|
|
|
|
ExecutionMode executionMode, AtomicInteger slots) {
|
|
|
|
|
if (connFuture.isCancelled()) {
|
|
|
|
|
connectionManager.getShutdownLatch().release();
|
|
|
|
|
return;
|
|
|
|
@ -840,7 +843,7 @@ public class CommandBatchService extends CommandAsyncService {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
final RedisConnection connection = connFuture.getNow();
|
|
|
|
|
RedisConnection connection = connFuture.getNow();
|
|
|
|
|
boolean isAtomic = executionMode != ExecutionMode.IN_MEMORY;
|
|
|
|
|
boolean isQueued = executionMode == ExecutionMode.REDIS_READ_ATOMIC || executionMode == ExecutionMode.REDIS_WRITE_ATOMIC;
|
|
|
|
|
|
|
|
|
@ -874,7 +877,7 @@ public class CommandBatchService extends CommandAsyncService {
|
|
|
|
|
return c.getCommand().getName().equals(RedisCommands.WAIT.getName());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected void handle(final RPromise<Void> mainPromise, final AtomicInteger slots, RFuture<?> future) {
|
|
|
|
|
protected void handle(RPromise<Void> mainPromise, AtomicInteger slots, RFuture<?> future) {
|
|
|
|
|
if (future.isSuccess()) {
|
|
|
|
|
if (slots.decrementAndGet() == 0) {
|
|
|
|
|
mainPromise.trySuccess(null);
|
|
|
|
|