Fixed - some tasks are not executed if RedissonNode shutdown #2645

pull/2819/head
Nikita Koksharov 5 years ago
parent a3061e8c4e
commit 7707cd5316

@ -437,6 +437,10 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
});
java.util.concurrent.Future<?> submitFuture = executor.submit(() -> {
if (commandExecutor.getConnectionManager().isShuttingDown()) {
return;
}
invokeMethod(request, method, cancelRequestFuture, responsePromise);
});

@ -683,15 +683,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
connectionWatcher.stop();
if (cfg.getExecutor() == null) {
executor.shutdown();
try {
executor.awaitTermination(timeout, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
RPromise<Void> result = new RedissonPromise<Void>();
CountableListener<Void> listener = new CountableListener<Void>(result, null, getEntrySet().size());
for (MasterSlaveEntry entry : getEntrySet()) {
@ -702,6 +693,15 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
resolverGroup.close();
shutdownLatch.close();
if (cfg.getExecutor() == null) {
executor.shutdown();
try {
executor.awaitTermination(timeout, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
shutdownPromise.trySuccess(null);
shutdownLatch.awaitUninterruptibly();

@ -17,6 +17,7 @@ package org.redisson.executor;
import org.redisson.RedissonExecutorService;
import org.redisson.RedissonRemoteService;
import org.redisson.RedissonShutdownException;
import org.redisson.api.RFuture;
import org.redisson.api.RMap;
import org.redisson.api.executor.*;
@ -25,7 +26,10 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncService;
import org.redisson.misc.RPromise;
import org.redisson.remote.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
@ -39,6 +43,8 @@ import java.util.stream.Collectors;
*/
public class RedissonExecutorRemoteService extends RedissonRemoteService {
private static final Logger log = LoggerFactory.getLogger(RedissonExecutorRemoteService.class);
private String tasksExpirationTimeName;
private String tasksCounterName;
private String statusName;
@ -94,7 +100,28 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService {
((RPromise) cancelRequestFuture).trySuccess(new RemoteServiceCancelRequest(true, false));
}, taskTimeout, TimeUnit.MILLISECONDS);
}
super.invokeMethod(request, method, cancelRequestFuture, responsePromise);
try {
Object result = method.getMethod().invoke(method.getBean(), request.getArgs());
RemoteServiceResponse response = new RemoteServiceResponse(request.getId(), result);
responsePromise.trySuccess(response);
} catch (Exception e) {
if (e instanceof InvocationTargetException
&& e.getCause() instanceof RedissonShutdownException) {
if (cancelRequestFuture != null) {
cancelRequestFuture.cancel(false);
}
return;
}
RemoteServiceResponse response = new RemoteServiceResponse(request.getId(), e.getCause());
responsePromise.trySuccess(response);
log.error("Can't execute: " + request, e);
}
if (cancelRequestFuture != null) {
cancelRequestFuture.cancel(false);
}
if (responsePromise.getNow() instanceof RemoteServiceResponse) {
RemoteServiceResponse response = (RemoteServiceResponse) responsePromise.getNow();

@ -206,24 +206,26 @@ public class TasksRunnerService implements RemoteExecutorService {
@Override
public Object executeCallable(TaskParameters params) {
renewRetryTime(params.getRequestId());
try {
RFuture<Long> future = renewRetryTime(params.getRequestId());
future.sync();
Callable<?> callable = decode(params);
return callable.call();
} catch (RedissonShutdownException e) {
return null;
// skip
Object res = callable.call();
finish(params.getRequestId(), true);
return res;
} catch (RedisException e) {
throw e;
} catch (Exception e) {
throw new IllegalArgumentException(e);
} finally {
finish(params.getRequestId(), true);
}
}
protected void scheduleRetryTimeRenewal(String requestId, long retryInterval) {
protected void scheduleRetryTimeRenewal(String requestId, Long retryInterval) {
if (retryInterval == null) {
return;
}
((Redisson) redisson).getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
@ -232,7 +234,7 @@ public class TasksRunnerService implements RemoteExecutorService {
}, Math.max(1000, retryInterval / 2), TimeUnit.MILLISECONDS);
}
protected void renewRetryTime(String requestId) {
protected RFuture<Long> renewRetryTime(String requestId) {
RFuture<Long> future = commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
// check if executor service not in shutdown state
"local name = ARGV[2];"
@ -260,7 +262,7 @@ public class TasksRunnerService implements RemoteExecutorService {
System.currentTimeMillis(), requestId);
future.onComplete((res, e) -> {
if (e != null) {
scheduleRetryTimeRenewal(requestId, 10000);
scheduleRetryTimeRenewal(requestId, 10000L);
return;
}
@ -268,6 +270,7 @@ public class TasksRunnerService implements RemoteExecutorService {
scheduleRetryTimeRenewal(requestId, res);
}
});
return future;
}
@SuppressWarnings("unchecked")
@ -323,21 +326,19 @@ public class TasksRunnerService implements RemoteExecutorService {
}
public void executeRunnable(TaskParameters params, boolean removeTask) {
if (params.getRequestId() != null && params.getRequestId().startsWith("00")) {
renewRetryTime(params.getRequestId());
}
try {
if (params.getRequestId() != null && params.getRequestId().startsWith("00")) {
RFuture<Long> future = renewRetryTime(params.getRequestId());
future.sync();
}
Runnable runnable = decode(params);
runnable.run();
} catch (RedissonShutdownException e) {
// skip
finish(params.getRequestId(), removeTask);
} catch (RedisException e) {
throw e;
} catch (Exception e) {
throw new IllegalArgumentException(e);
} finally {
finish(params.getRequestId(), removeTask);
}
}

@ -15,10 +15,7 @@ import org.awaitility.Duration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.redisson.BaseTest;
import org.redisson.RedisRunner;
import org.redisson.Redisson;
import org.redisson.RedissonNode;
import org.redisson.*;
import org.redisson.api.*;
import org.redisson.api.annotation.RInject;
import org.redisson.api.executor.TaskFinishedListener;
@ -114,7 +111,7 @@ public class RedissonExecutorServiceTest extends BaseTest {
e.execute();
}
@Test
// @Test
public void testTaskFinishing() throws Exception {
AtomicInteger counter = new AtomicInteger();
new MockUp<TasksRunnerService>() {
@ -241,6 +238,12 @@ public class RedissonExecutorServiceTest extends BaseTest {
private void finish(Invocation invocation, String requestId, boolean removeTask) {
if (counter.incrementAndGet() > 1) {
invocation.proceed();
} else {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
@ -255,21 +258,30 @@ public class RedissonExecutorServiceTest extends BaseTest {
RExecutorService executor = redisson.getExecutorService("test2", ExecutorOptions.defaults().taskRetryInterval(10, TimeUnit.SECONDS));
RExecutorFuture<?> f = executor.submit(new IncrementRunnableTask("counter"));
f.get();
assertThat(executor.getTaskCount()).isEqualTo(1);
Thread.sleep(1000);
assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(1);
Thread.sleep(2000);
Thread.sleep(1000);
System.out.println("shutdown");
node.shutdown();
assertThat(executor.getTaskCount()).isEqualTo(1);
node = RedissonNode.create(nodeConfig);
node.start();
assertThat(executor.getTaskCount()).isEqualTo(1);
Thread.sleep(8500);
assertThat(executor.getTaskCount()).isEqualTo(0);
assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(2);
Thread.sleep(16000);
assertThat(executor.getTaskCount()).isEqualTo(0);
assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(2);
redisson.getKeys().delete("counter");
f.get();
assertThat(redisson.getKeys().count()).isEqualTo(1);
}

Loading…
Cancel
Save