|
|
|
@ -215,7 +215,10 @@ public abstract class RedissonBaseLock extends RedissonExpirable implements RLoc
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected <T> RFuture<T> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
|
|
|
|
|
CommandBatchService executorService = createCommandBatchService();
|
|
|
|
|
MasterSlaveEntry entry = commandExecutor.getConnectionManager().getEntry(getRawName());
|
|
|
|
|
int availableSlaves = entry.getAvailableSlaves();
|
|
|
|
|
|
|
|
|
|
CommandBatchService executorService = createCommandBatchService(availableSlaves);
|
|
|
|
|
RFuture<T> result = executorService.evalWriteAsync(key, codec, evalCommandType, script, keys, params);
|
|
|
|
|
if (commandExecutor instanceof CommandBatchService) {
|
|
|
|
|
return result;
|
|
|
|
@ -228,20 +231,22 @@ public abstract class RedissonBaseLock extends RedissonExpirable implements RLoc
|
|
|
|
|
r.tryFailure(ex);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
if (res.getSyncedSlaves() != availableSlaves) {
|
|
|
|
|
r.tryFailure(new IllegalStateException("Only " + res.getSyncedSlaves() + " of " + availableSlaves + " slaves were synced"));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
r.trySuccess(result.getNow());
|
|
|
|
|
});
|
|
|
|
|
return r;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private CommandBatchService createCommandBatchService() {
|
|
|
|
|
private CommandBatchService createCommandBatchService(int availableSlaves) {
|
|
|
|
|
if (commandExecutor instanceof CommandBatchService) {
|
|
|
|
|
return (CommandBatchService) commandExecutor;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
MasterSlaveEntry entry = commandExecutor.getConnectionManager().getEntry(getRawName());
|
|
|
|
|
BatchOptions options = BatchOptions.defaults()
|
|
|
|
|
.syncSlaves(entry.getAvailableSlaves(), 1, TimeUnit.SECONDS);
|
|
|
|
|
.syncSlaves(availableSlaves, 1, TimeUnit.SECONDS);
|
|
|
|
|
|
|
|
|
|
return new CommandBatchService(commandExecutor, options);
|
|
|
|
|
}
|
|
|
|
|