Fixed - RLock throws ERR unknown command 'wait' on AWS Elasticache serverless. #5586

pull/5599/head
Nikita Koksharov 1 year ago
parent b93bef8fac
commit d91031f5ac

@ -460,14 +460,16 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return result.toArray(); return result.toArray();
} }
private final AtomicBoolean evalShaROSupported = new AtomicBoolean(true); private static final AtomicBoolean EVAL_SHA_RO_SUPPORTED = new AtomicBoolean(true);
private static final AtomicReference<Boolean> WAIT_SUPPORTED = new AtomicReference<>();
public boolean isEvalShaROSupported() { public boolean isEvalShaROSupported() {
return evalShaROSupported.get(); return EVAL_SHA_RO_SUPPORTED.get();
} }
public void setEvalShaROSupported(boolean value) { public void setEvalShaROSupported(boolean value) {
this.evalShaROSupported.set(value); this.EVAL_SHA_RO_SUPPORTED.set(value);
} }
private <T, R> RFuture<R> evalAsync(NodeSource nodeSource, boolean readOnlyMode, Codec codec, RedisCommand<T> evalCommandType, private <T, R> RFuture<R> evalAsync(NodeSource nodeSource, boolean readOnlyMode, Codec codec, RedisCommand<T> evalCommandType,
@ -480,7 +482,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
CompletableFuture<R> promise = new CompletableFuture<>(); CompletableFuture<R> promise = new CompletableFuture<>();
String sha1 = getServiceManager().calcSHA(script); String sha1 = getServiceManager().calcSHA(script);
RedisCommand cmd; RedisCommand cmd;
if (readOnlyMode && evalShaROSupported.get()) { if (readOnlyMode && EVAL_SHA_RO_SUPPORTED.get()) {
cmd = new RedisCommand(evalCommandType, "EVALSHA_RO"); cmd = new RedisCommand(evalCommandType, "EVALSHA_RO");
} else { } else {
cmd = new RedisCommand(evalCommandType, "EVALSHA"); cmd = new RedisCommand(evalCommandType, "EVALSHA");
@ -500,7 +502,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
promise.whenComplete((res, e) -> { promise.whenComplete((res, e) -> {
if (e != null) { if (e != null) {
if (e.getMessage().startsWith("ERR unknown command")) { if (e.getMessage().startsWith("ERR unknown command")) {
evalShaROSupported.set(false); EVAL_SHA_RO_SUPPORTED.set(false);
RFuture<R> future = evalAsync(nodeSource, readOnlyMode, codec, evalCommandType, script, keys, noRetry, pps); RFuture<R> future = evalAsync(nodeSource, readOnlyMode, codec, evalCommandType, script, keys, noRetry, pps);
transfer(future.toCompletableFuture(), mainPromise); transfer(future.toCompletableFuture(), mainPromise);
} else if (e.getMessage().startsWith("NOSCRIPT")) { } else if (e.getMessage().startsWith("NOSCRIPT")) {
@ -568,13 +570,13 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return async(false, source, codec, command, params, false, false); return async(false, source, codec, command, params, false, false);
} }
private final AtomicBoolean sortRoSupported = new AtomicBoolean(true); private static final AtomicBoolean SORT_RO_SUPPORTED = new AtomicBoolean(true);
public <V, R> RFuture<R> async(boolean readOnlyMode, NodeSource source, Codec codec, public <V, R> RFuture<R> async(boolean readOnlyMode, NodeSource source, Codec codec,
RedisCommand<V> command, Object[] params, boolean ignoreRedirect, boolean noRetry) { RedisCommand<V> command, Object[] params, boolean ignoreRedirect, boolean noRetry) {
if (readOnlyMode && command.getName().equals("SORT") && !sortRoSupported.get()) { if (readOnlyMode && command.getName().equals("SORT") && !SORT_RO_SUPPORTED.get()) {
readOnlyMode = false; readOnlyMode = false;
} else if (readOnlyMode && command.getName().equals("SORT") && sortRoSupported.get()) { } else if (readOnlyMode && command.getName().equals("SORT") && SORT_RO_SUPPORTED.get()) {
RedisCommand cmd = new RedisCommand("SORT_RO", command.getReplayMultiDecoder()); RedisCommand cmd = new RedisCommand("SORT_RO", command.getReplayMultiDecoder());
CompletableFuture<R> mainPromise = createPromise(); CompletableFuture<R> mainPromise = createPromise();
RedisExecutor<V, R> executor = new RedisExecutor<>(readOnlyMode, source, codec, cmd, params, mainPromise, RedisExecutor<V, R> executor = new RedisExecutor<>(readOnlyMode, source, codec, cmd, params, mainPromise,
@ -584,7 +586,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
CompletableFuture<R> result = new CompletableFuture<>(); CompletableFuture<R> result = new CompletableFuture<>();
mainPromise.whenComplete((r, e) -> { mainPromise.whenComplete((r, e) -> {
if (e != null && e.getMessage().startsWith("ERR unknown command")) { if (e != null && e.getMessage().startsWith("ERR unknown command")) {
sortRoSupported.set(false); SORT_RO_SUPPORTED.set(false);
RFuture<R> future = async(false, source, codec, command, params, ignoreRedirect, noRetry); RFuture<R> future = async(false, source, codec, command, params, ignoreRedirect, noRetry);
transfer(future.toCompletableFuture(), result); transfer(future.toCompletableFuture(), result);
return; return;
@ -892,10 +894,12 @@ public class CommandAsyncService implements CommandAsyncExecutor {
public <T> CompletionStage<T> handleNoSync(CompletionStage<T> stage, Supplier<CompletionStage<?>> supplier) { public <T> CompletionStage<T> handleNoSync(CompletionStage<T> stage, Supplier<CompletionStage<?>> supplier) {
CompletionStage<T> s = stage.handle((r, ex) -> { CompletionStage<T> s = stage.handle((r, ex) -> {
if (ex != null) { if (ex != null) {
if (ex.getCause().getMessage().equals("None of slaves were synced")) { if (ex.getCause().getMessage() != null
&& ex.getCause().getMessage().equals("None of slaves were synced")) {
return supplier.get().handle((r1, e) -> { return supplier.get().handle((r1, e) -> {
if (e != null) { if (e != null) {
if (e.getCause().getMessage().equals("None of slaves were synced")) { if (ex.getCause().getMessage() != null
&& e.getCause().getMessage().equals("None of slaves were synced")) {
throw new CompletionException(ex.getCause()); throw new CompletionException(ex.getCause());
} }
e.getCause().addSuppressed(ex.getCause()); e.getCause().addSuppressed(ex.getCause());
@ -918,35 +922,56 @@ public class CommandAsyncService implements CommandAsyncExecutor {
@Override @Override
public <T> RFuture<T> syncedEval(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) { public <T> RFuture<T> syncedEval(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
CompletionStage<Map<String, String>> replicationFuture = CompletableFuture.completedFuture(Collections.emptyMap()); if (getServiceManager().getCfg().isSingleConfig()
if (!getServiceManager().getCfg().isSingleConfig() || this instanceof CommandBatchService
&& !(this instanceof CommandBatchService)) { || (WAIT_SUPPORTED.get() != null && !WAIT_SUPPORTED.get())) {
replicationFuture = writeAsync(key, RedisCommands.INFO_REPLICATION); return evalWriteAsync(key, codec, evalCommandType, script, keys, params);
} }
CompletionStage<T> resFuture = replicationFuture.thenCompose(r -> {
int availableSlaves = Integer.parseInt(r.getOrDefault("connected_slaves", "0"));
CommandBatchService executorService = createCommandBatchService(availableSlaves);
RFuture<T> result = executorService.evalWriteAsync(key, codec, evalCommandType, script, keys, params);
if (executorService == this) {
return result;
}
RFuture<BatchResult<?>> future = executorService.executeAsync(); CompletionStage<Integer> waitFuture = CompletableFuture.completedFuture(0);
CompletionStage<T> f = future.handle((res, ex) -> { if (WAIT_SUPPORTED.get() == null) {
if (ex != null) { waitFuture = writeAsync(key, RedisCommands.WAIT, 0, 0);
throw new CompletionException(ex); }
CompletionStage<T> resFuture = waitFuture.handle((r2, ex2) -> {
if (ex2 != null) {
if (ex2.getMessage().startsWith("ERR unknown command")) {
WAIT_SUPPORTED.set(false);
CompletionStage<T> f = evalWriteAsync(key, codec, evalCommandType, script, keys, params);
return f;
} }
if (getServiceManager().getCfg().isCheckLockSyncedSlaves() throw new CompletionException(ex2);
&& res.getSyncedSlaves() == 0 && availableSlaves > 0) { }
throw new CompletionException(
new IllegalStateException("None of slaves were synced. Try to increase slavesSyncTimeout setting or set checkLockSyncedSlaves = false.")); WAIT_SUPPORTED.set(true);
CompletionStage<Map<String, String>> replicationFuture = writeAsync(key, RedisCommands.INFO_REPLICATION);
CompletionStage<T> resultFuture = replicationFuture.thenCompose(r -> {
int availableSlaves = Integer.parseInt(r.getOrDefault("connected_slaves", "0"));
CommandBatchService executorService = createCommandBatchService(availableSlaves);
RFuture<T> result = executorService.evalWriteAsync(key, codec, evalCommandType, script, keys, params);
if (executorService == this) {
return result;
} }
return getNow(result.toCompletableFuture()); RFuture<BatchResult<?>> future = executorService.executeAsync();
CompletionStage<T> f = future.handle((res, ex) -> {
if (ex != null) {
throw new CompletionException(ex);
}
if (getServiceManager().getCfg().isCheckLockSyncedSlaves()
&& res.getSyncedSlaves() == 0 && availableSlaves > 0) {
throw new CompletionException(
new IllegalStateException("None of slaves were synced. Try to increase slavesSyncTimeout setting or set checkLockSyncedSlaves = false."));
}
return getNow(result.toCompletableFuture());
});
return f;
}); });
return f; return resultFuture;
}); }).thenCompose(f -> f);
return new CompletableFutureWrapper<>(resFuture); return new CompletableFutureWrapper<>(resFuture);
} }

@ -473,7 +473,8 @@ public class ServiceManager {
CompletionStage<T> future = supplier.get(); CompletionStage<T> future = supplier.get();
future.whenComplete((r, e) -> { future.whenComplete((r, e) -> {
if (e != null) { if (e != null) {
if (e.getCause().getMessage().equals("None of slaves were synced")) { if (e.getCause().getMessage() != null
&& e.getCause().getMessage().equals("None of slaves were synced")) {
if (attempts.decrementAndGet() < 0) { if (attempts.decrementAndGet() < 0) {
result.completeExceptionally(e); result.completeExceptionally(e);
return; return;

@ -185,7 +185,7 @@ public class RedissonLockTest extends BaseConcurrentTest {
config.useSingleServer().setAddress("redis://127.0.0.1:" + redis.getFirstMappedPort()); config.useSingleServer().setAddress("redis://127.0.0.1:" + redis.getFirstMappedPort());
RedissonClient redisson = Redisson.create(config); RedissonClient redisson = Redisson.create(config);
Assertions.assertThrows(WriteRedisConnectionException.class, () -> { Assertions.assertThrows(RedisException.class, () -> {
RLock lock = redisson.getLock("myLock"); RLock lock = redisson.getLock("myLock");
// kill RedisServer while main thread is sleeping. // kill RedisServer while main thread is sleeping.

Loading…
Cancel
Save