|
|
|
@ -207,33 +207,36 @@ public abstract class RedissonBaseLock extends RedissonExpirable implements RLoc
|
|
|
|
|
|
|
|
|
|
protected <T> RFuture<T> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
|
|
|
|
|
MasterSlaveEntry entry = commandExecutor.getConnectionManager().getEntry(getRawName());
|
|
|
|
|
int availableSlaves;
|
|
|
|
|
if (entry != null) {
|
|
|
|
|
availableSlaves = entry.getAvailableSlaves();
|
|
|
|
|
} else {
|
|
|
|
|
availableSlaves = 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
CommandBatchService executorService = createCommandBatchService(availableSlaves);
|
|
|
|
|
RFuture<T> result = executorService.evalWriteAsync(key, codec, evalCommandType, script, keys, params);
|
|
|
|
|
if (commandExecutor instanceof CommandBatchService) {
|
|
|
|
|
return result;
|
|
|
|
|
CompletionStage<Map<String, String>> replicationFuture = CompletableFuture.completedFuture(Collections.emptyMap());
|
|
|
|
|
if (!(commandExecutor instanceof CommandBatchService) && entry != null && entry.getAvailableSlaves() > 0) {
|
|
|
|
|
replicationFuture = commandExecutor.writeAsync(entry, null, RedisCommands.INFO_REPLICATION);
|
|
|
|
|
}
|
|
|
|
|
CompletionStage<T> resFuture = replicationFuture.thenCompose(r -> {
|
|
|
|
|
Integer availableSlaves = Integer.valueOf(r.getOrDefault("connected_slaves", "0"));
|
|
|
|
|
|
|
|
|
|
RFuture<BatchResult<?>> future = executorService.executeAsync();
|
|
|
|
|
CompletionStage<T> f = future.handle((res, ex) -> {
|
|
|
|
|
if (ex != null) {
|
|
|
|
|
throw new CompletionException(ex);
|
|
|
|
|
}
|
|
|
|
|
if (commandExecutor.getConnectionManager().getCfg().isCheckLockSyncedSlaves()
|
|
|
|
|
&& res.getSyncedSlaves() == 0 && availableSlaves > 0) {
|
|
|
|
|
throw new CompletionException(
|
|
|
|
|
new IllegalStateException("None of slaves were synced"));
|
|
|
|
|
CommandBatchService executorService = createCommandBatchService(availableSlaves);
|
|
|
|
|
RFuture<T> result = executorService.evalWriteAsync(key, codec, evalCommandType, script, keys, params);
|
|
|
|
|
if (commandExecutor instanceof CommandBatchService) {
|
|
|
|
|
return result;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return commandExecutor.getNow(result.toCompletableFuture());
|
|
|
|
|
RFuture<BatchResult<?>> future = executorService.executeAsync();
|
|
|
|
|
CompletionStage<T> f = future.handle((res, ex) -> {
|
|
|
|
|
if (ex != null) {
|
|
|
|
|
throw new CompletionException(ex);
|
|
|
|
|
}
|
|
|
|
|
if (commandExecutor.getConnectionManager().getCfg().isCheckLockSyncedSlaves()
|
|
|
|
|
&& res.getSyncedSlaves() == 0 && availableSlaves > 0) {
|
|
|
|
|
throw new CompletionException(
|
|
|
|
|
new IllegalStateException("None of slaves were synced"));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return commandExecutor.getNow(result.toCompletableFuture());
|
|
|
|
|
});
|
|
|
|
|
return f;
|
|
|
|
|
});
|
|
|
|
|
return new CompletableFutureWrapper<>(f);
|
|
|
|
|
return new CompletableFutureWrapper<>(resFuture);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private CommandBatchService createCommandBatchService(int availableSlaves) {
|
|
|
|
|