Fixed - task scheduled with time more than 1 hour is not executed #1619

pull/1624/head
Nikita 7 years ago
parent 618505befa
commit 5382062871

@ -212,7 +212,7 @@ public abstract class BaseRemoteService {
final RPromise<RRemoteServiceResponse> responseFuture;
if (optionsCopy.isResultExpected()) {
responseFuture = pollResponse(optionsCopy.getExecutionTimeoutInMillis(), requestId, false);
responseFuture = pollResultResponse(optionsCopy.getExecutionTimeoutInMillis(), requestId, request);
} else {
responseFuture = null;
}
@ -360,6 +360,11 @@ public abstract class BaseRemoteService {
});
}
protected <T extends RRemoteServiceResponse> RPromise<T> pollResultResponse(long timeout,
RequestId requestId, RemoteServiceRequest request) {
return pollResponse(timeout, requestId, false);
}
private <T extends RRemoteServiceResponse> RPromise<T> pollResponse(final long timeout,
final RequestId requestId, boolean insertFirst) {
final RPromise<T> responseFuture = new RedissonPromise<T>();
@ -540,7 +545,7 @@ public abstract class BaseRemoteService {
final RPromise<RRemoteServiceResponse> responseFuture;
if (optionsCopy.isResultExpected()) {
responseFuture = pollResponse(optionsCopy.getExecutionTimeoutInMillis(), requestId, false);
responseFuture = pollResultResponse(optionsCopy.getExecutionTimeoutInMillis(), requestId, request);
} else {
responseFuture = null;
}

@ -99,7 +99,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
private static final Logger LOGGER = LoggerFactory.getLogger(RedissonExecutorService.class);
private static final RemoteInvocationOptions RESULT_OPTIONS = RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.HOURS);
private static RemoteInvocationOptions RESULT_OPTIONS = RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.HOURS);
public static final int SHUTDOWN_STATE = 1;
public static final int TERMINATED_STATE = 2;

@ -27,6 +27,8 @@ import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.executor.params.ScheduledParameters;
import org.redisson.misc.RPromise;
import org.redisson.remote.RRemoteServiceResponse;
import org.redisson.remote.RemoteServiceRequest;
import org.redisson.remote.RequestId;
import org.redisson.remote.ResponseEntry;
@ -119,6 +121,17 @@ public class ScheduledTasksService extends TasksService {
taskId.toString(), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE);
}
@Override
protected <T extends RRemoteServiceResponse> RPromise<T> pollResultResponse(long timeout, RequestId requestId,
RemoteServiceRequest request) {
if (request.getArgs()[0].getClass() == ScheduledParameters.class) {
ScheduledParameters params = (ScheduledParameters) request.getArgs()[0];
timeout += params.getStartTime() - System.currentTimeMillis();
}
return super.pollResultResponse(timeout, requestId, request);
}
@Override
protected RequestId generateRequestId() {
if (requestId == null) {

@ -43,8 +43,6 @@ public class LogHelper {
return toArrayString(object);
} else if (object instanceof Collection) {
return toCollectionString((Collection<?>) object);
} else if (object instanceof ByteBuf) {
return ((ByteBuf) object).toString(CharsetUtil.UTF_8);
} else if (object instanceof CommandData) {
CommandData cd = (CommandData)object;
return cd.getCommand() + ", params: " + LogHelper.toString(cd.getParams());

@ -17,15 +17,18 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.redisson.BaseTest;
import org.redisson.RedissonExecutorService;
import org.redisson.RedissonNode;
import org.redisson.api.CronSchedule;
import org.redisson.api.ExecutorOptions;
import org.redisson.api.RExecutorFuture;
import org.redisson.api.RScheduledExecutorService;
import org.redisson.api.RScheduledFuture;
import org.redisson.api.RemoteInvocationOptions;
import org.redisson.config.Config;
import org.redisson.config.RedissonNodeConfig;
import mockit.Deencapsulation;
import mockit.Invocation;
import mockit.Mock;
import mockit.MockUp;
@ -58,8 +61,27 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
long start = System.currentTimeMillis();
RScheduledFuture<?> f = executor.schedule(new ScheduledCallableTask(), 11, TimeUnit.SECONDS);
assertThat(f.awaitUninterruptibly(12000)).isTrue();
assertThat(f.isSuccess()).isTrue();
assertThat(System.currentTimeMillis() - start).isBetween(11000L, 11500L);
Deencapsulation.setField(RedissonExecutorService.class, "RESULT_OPTIONS", RemoteInvocationOptions.defaults().noAck().expectResultWithin(3, TimeUnit.SECONDS));
executor = redisson.getExecutorService("test", ExecutorOptions.defaults().taskRetryInterval(5, TimeUnit.SECONDS));
start = System.currentTimeMillis();
RScheduledFuture<?> f1 = executor.schedule(new ScheduledCallableTask(), 5, TimeUnit.SECONDS);
assertThat(f1.awaitUninterruptibly(6000)).isTrue();
assertThat(f1.isSuccess()).isTrue();
assertThat(System.currentTimeMillis() - start).isBetween(5000L, 5500L);
start = System.currentTimeMillis();
RScheduledFuture<?> f2 = executor.schedule(new RunnableTask(), 5, TimeUnit.SECONDS);
assertThat(f2.awaitUninterruptibly(6000)).isTrue();
assertThat(f2.isSuccess()).isTrue();
assertThat(System.currentTimeMillis() - start).isBetween(5000L, 5500L);
}
@Test
public void testScheduleWithFixedDelay() throws InterruptedException {
RScheduledExecutorService executor = redisson.getExecutorService("test", ExecutorOptions.defaults().taskRetryInterval(5, TimeUnit.SECONDS));
executor.scheduleWithFixedDelay(new IncrementRunnableTask("counter"), 0, 7, TimeUnit.SECONDS);
Thread.sleep(500);
assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(1);

Loading…
Cancel
Save