Fixed - Remote service throws RedissonShutdownException #5947

pull/6077/head
Nikita Koksharov 7 months ago
parent 0e93e86022
commit 5ade3f228d

@ -81,7 +81,7 @@ public class ElementsSubscribeService {
f.whenComplete((r, e) -> {
if (e != null) {
if (e.getCause() instanceof RedissonShutdownException) {
if (serviceManager.isShuttingDown(e)) {
return;
}
@ -107,7 +107,7 @@ public class ElementsSubscribeService {
f.thenCompose(consumer).whenComplete((r, ex) -> {
if (ex != null) {
if (ex.getCause() instanceof RedissonShutdownException) {
if (serviceManager.isShuttingDown(ex)) {
return;
}

@ -143,7 +143,7 @@ public abstract class QueueTransferTask {
RFuture<Long> startTimeFuture = pushTaskAsync();
startTimeFuture.whenComplete((res, e) -> {
if (e != null) {
if (e instanceof RedissonShutdownException) {
if (serviceManager.isShuttingDown(e)) {
return;
}
log.error(e.getMessage(), e);

@ -144,11 +144,11 @@ public abstract class RedissonBaseLock extends RedissonExpirable implements RLoc
CompletionStage<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e instanceof RedissonShutdownException) {
return;
}
if (e != null) {
if (getServiceManager().isShuttingDown(e)) {
return;
}
log.error("Can't update lock {} expiration", getRawName(), e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
@ -291,7 +291,7 @@ public abstract class RedissonBaseLock extends RedissonExpirable implements RLoc
@Override
public RFuture<Void> unlockAsync(long threadId) {
return getServiceManager().execute(() -> unlockAsync0(threadId), null);
return getServiceManager().execute(() -> unlockAsync0(threadId));
}
private RFuture<Void> unlockAsync0(long threadId) {

@ -125,7 +125,7 @@ public class RedissonIdGenerator extends RedissonExpirable implements RIdGenerat
Arrays.asList(getRawName(), getAllocationSizeName()));
future.whenComplete((res, ex) -> {
if (ex != null) {
if (ex instanceof RedissonShutdownException) {
if (getServiceManager().isShuttingDown(ex)) {
return;
}

@ -110,9 +110,6 @@ public class RedissonLock extends RedissonBaseLock {
if (ttl == null) {
return;
}
if (ttl == Long.MIN_VALUE) {
throw new InterruptedException();
}
CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
pubSub.timeout(future);
@ -130,9 +127,6 @@ public class RedissonLock extends RedissonBaseLock {
if (ttl == null) {
break;
}
if (ttl == Long.MIN_VALUE) {
throw new InterruptedException();
}
// waiting for message
if (ttl >= 0) {
@ -163,7 +157,7 @@ public class RedissonLock extends RedissonBaseLock {
}
private RFuture<Long> tryAcquireAsync0(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return getServiceManager().execute(() -> tryAcquireAsync(waitTime, leaseTime, unit, threadId), Long.MIN_VALUE);
return getServiceManager().execute(() -> tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
@ -239,9 +233,6 @@ public class RedissonLock extends RedissonBaseLock {
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == Long.MIN_VALUE) {
return false;
}
// lock acquired
if (ttl == null) {
return true;
@ -289,9 +280,6 @@ public class RedissonLock extends RedissonBaseLock {
if (ttl == null) {
return true;
}
if (ttl == Long.MIN_VALUE) {
return false;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
@ -459,7 +447,7 @@ public class RedissonLock extends RedissonBaseLock {
@Override
public RFuture<Boolean> tryLockAsync(long threadId) {
return getServiceManager().execute(() -> tryAcquireOnceAsync(-1, -1, null, threadId), false);
return getServiceManager().execute(() -> tryAcquireOnceAsync(-1, -1, null, threadId));
}
@Override

@ -371,7 +371,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
return getServiceManager().execute(() -> {
RFuture<List<String>> future = tryAcquireAsync(ids, timeoutDate);
return commandExecutor.handleNoSync(future, () -> releaseAsync(ids));
}, Collections.emptyList());
});
}
private RFuture<List<String>> tryAcquireAsync(List<String> ids, long timeoutDate) {

@ -183,7 +183,7 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl
ff.whenComplete((res, ex) -> {
if (ex != null) {
if (ex instanceof RedissonShutdownException) {
if (getServiceManager().isShuttingDown(ex)) {
return;
}
@ -250,7 +250,7 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl
updateFuture.whenComplete((re, exc) -> {
if (exc != null) {
if (exc instanceof RedissonShutdownException) {
if (getServiceManager().isShuttingDown(exc)) {
return;
}
log.error("Unable to update subscriber status", exc);

@ -242,7 +242,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
}
if (e != null) {
if (e instanceof RedissonShutdownException) {
if (commandExecutor.getServiceManager().isShuttingDown(e)) {
return;
}
log.error("Can't process the remote service request. A new attempt has been made.", e);
@ -277,7 +277,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
RFuture<RemoteServiceRequest> taskFuture = getTask(requestId, tasks);
taskFuture.whenComplete((request, exc) -> {
if (exc != null) {
if (exc instanceof RedissonShutdownException) {
if (commandExecutor.getServiceManager().isShuttingDown(exc)) {
return;
}
log.error("Can't process the remote service request with id {}. Try to increase 'retryInterval' and/or 'retryAttempts' settings", requestId, exc);
@ -327,7 +327,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
ackClientsFuture.whenComplete((r, ex) -> {
if (ex != null) {
if (ex instanceof RedissonShutdownException) {
if (commandExecutor.getServiceManager().isShuttingDown(ex)) {
return;
}
log.error("Can't send ack for request: {}. Try to increase 'retryInterval' and/or 'retryAttempts' settings", request, ex);
@ -347,7 +347,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
RFuture<Boolean> addFuture = list.addAsync(new RemoteServiceAck(request.getId()));
addFuture.whenComplete((res, exce) -> {
if (exce != null) {
if (exce instanceof RedissonShutdownException) {
if (commandExecutor.getServiceManager().isShuttingDown(exce)) {
return;
}
log.error("Can't send ack for request: {}. Try to increase 'retryInterval' and/or 'retryAttempts' settings", request, exce);
@ -365,7 +365,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
executeMethod(remoteInterface, requestQueue, executor, request, bean);
})
.exceptionally(exack -> {
if (exack instanceof RedissonShutdownException) {
if (commandExecutor.getServiceManager().isShuttingDown(exack)) {
return null;
}
log.error("Can't send ack for request: {}", request, exack);
@ -377,7 +377,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
}
})
.exceptionally(exc -> {
if (exc instanceof RedissonShutdownException) {
if (commandExecutor.getServiceManager().isShuttingDown(exc)) {
return null;
}
log.error("Can't process the remote service request with id {}", requestId, exc);
@ -424,7 +424,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
clientsFuture.whenComplete((res, exc) -> {
if (exc != null) {
if (exc instanceof RedissonShutdownException) {
if (commandExecutor.getServiceManager().isShuttingDown(exc)) {
return;
}
log.error("Can't send response: {} for request: {}. Try to increase 'retryInterval' and/or 'retryAttempts' settings", response, request, exc);

@ -263,7 +263,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
return commandExecutor.getServiceManager().execute(() -> {
RFuture<Boolean> future = tryAcquireAsync0(permits);
return commandExecutor.handleNoSync(future, () -> releaseAsync(permits));
}, false);
});
}
private RFuture<Boolean> tryAcquireAsync0(int permits) {

@ -487,7 +487,7 @@ public class RedisExecutor<V, R> {
return;
}
if (e instanceof RedissonShutdownException) {
if (connectionManager.getServiceManager().isShuttingDown(e)) {
attemptPromise.completeExceptionally(e);
}
});

@ -275,6 +275,11 @@ public final class ServiceManager {
return shutdownLatch.get();
}
public boolean isShuttingDown(Throwable e) {
return e instanceof RedissonShutdownException
|| e.getCause() instanceof RedissonShutdownException;
}
public boolean isShutdown() {
return group.isTerminated();
}
@ -503,22 +508,18 @@ public final class ServiceManager {
});
}
public <T> RFuture<T> execute(Supplier<CompletionStage<T>> supplier, T defaultValue) {
public <T> RFuture<T> execute(Supplier<CompletionStage<T>> supplier) {
CompletableFuture<T> result = new CompletableFuture<>();
int retryAttempts = config.getRetryAttempts();
AtomicInteger attempts = new AtomicInteger(retryAttempts);
execute(attempts, result, supplier, defaultValue);
execute(attempts, result, supplier);
return new CompletableFutureWrapper<>(result);
}
private <T> void execute(AtomicInteger attempts, CompletableFuture<T> result, Supplier<CompletionStage<T>> supplier, T defaultValue) {
private <T> void execute(AtomicInteger attempts, CompletableFuture<T> result, Supplier<CompletionStage<T>> supplier) {
CompletionStage<T> future = supplier.get();
future.whenComplete((r, e) -> {
if (e != null) {
if (e instanceof RedissonShutdownException) {
result.complete(defaultValue);
return;
}
if (e.getCause().getMessage() != null
&& e.getCause().getMessage().equals("None of slaves were synced")) {
if (attempts.decrementAndGet() < 0) {
@ -526,7 +527,7 @@ public final class ServiceManager {
return;
}
newTimeout(t -> execute(attempts, result, supplier, defaultValue),
newTimeout(t -> execute(attempts, result, supplier),
config.getRetryInterval(), TimeUnit.MILLISECONDS);
return;
}

@ -183,7 +183,7 @@ public abstract class BaseRemoteProxy {
private BiConsumer<RRemoteServiceResponse, Throwable> createResponseListener() {
return (response, e) -> {
if (e != null) {
if (e instanceof RedissonShutdownException) {
if (commandExecutor.getServiceManager().isShuttingDown(e)) {
return;
}

Loading…
Cancel
Save