pull/4031/head
Nikita Koksharov 3 years ago
parent ba98b14dd6
commit 554415ef3b

@ -42,6 +42,9 @@ import java.lang.invoke.SerializedLambda;
import java.lang.ref.ReferenceQueue; import java.lang.ref.ReferenceQueue;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.lang.reflect.Modifier; import java.lang.reflect.Modifier;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -979,11 +982,12 @@ public class RedissonExecutorService implements RScheduledExecutorService {
check(task); check(task);
ClassBody classBody = getClassBody(task); ClassBody classBody = getClassBody(task);
byte[] state = encode(task); byte[] state = encode(task);
Date startDate = cronSchedule.getExpression().getNextValidTimeAfter(new Date()); ZonedDateTime currentDate = ZonedDateTime.of(LocalDateTime.now(), ZoneId.systemDefault());
ZonedDateTime startDate = cronSchedule.getExpression().nextTimeAfter(currentDate);
if (startDate == null) { if (startDate == null) {
throw new IllegalArgumentException("Wrong cron expression! Unable to calculate start date"); throw new IllegalArgumentException("Wrong cron expression! Unable to calculate start date");
} }
long startTime = startDate.getTime(); long startTime = startDate.toInstant().toEpochMilli();
ScheduledCronExpressionParameters params = new ScheduledCronExpressionParameters(); ScheduledCronExpressionParameters params = new ScheduledCronExpressionParameters();
params.setClassName(classBody.getClazzName()); params.setClassName(classBody.getClazzName());
@ -991,14 +995,14 @@ public class RedissonExecutorService implements RScheduledExecutorService {
params.setLambdaBody(classBody.getLambda()); params.setLambdaBody(classBody.getLambda());
params.setState(state); params.setState(state);
params.setStartTime(startTime); params.setStartTime(startTime);
params.setCronExpression(cronSchedule.getExpression().getCronExpression()); params.setCronExpression(cronSchedule.getExpression().getExpr());
params.setTimezone(cronSchedule.getExpression().getTimeZone().getID()); params.setTimezone(ZoneId.systemDefault().toString());
params.setExecutorId(executorId); params.setExecutorId(executorId);
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledServiceAtFixed.schedule(params); RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledServiceAtFixed.schedule(params);
addListener(result); addListener(result);
RedissonScheduledFuture<Void> f = new RedissonScheduledFuture<Void>(result, startTime) { RedissonScheduledFuture<Void> f = new RedissonScheduledFuture<Void>(result, startTime) {
public long getDelay(TimeUnit unit) { public long getDelay(TimeUnit unit) {
return unit.convert(startDate.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS); return unit.convert(startDate.toInstant().toEpochMilli() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}; };
}; };
storeReference(f, result.getRequestId()); storeReference(f, result.getRequestId());

@ -41,10 +41,11 @@ import org.redisson.remote.ResponseEntry;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ObjectInput; import java.io.ObjectInput;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date;
import java.util.Map; import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -139,12 +140,12 @@ public class TasksRunnerService implements RemoteExecutorService {
@Override @Override
public void schedule(ScheduledCronExpressionParameters params) { public void schedule(ScheduledCronExpressionParameters params) {
CronExpression expression = new CronExpression(params.getCronExpression()); CronExpression expression = new CronExpression(params.getCronExpression());
expression.setTimeZone(TimeZone.getTimeZone(params.getTimezone())); ZonedDateTime currentDate = ZonedDateTime.of(LocalDateTime.now(), ZoneId.of(params.getTimezone()));
Date nextStartDate = expression.getNextValidTimeAfter(new Date()); ZonedDateTime nextStartDate = expression.nextTimeAfter(currentDate);
RFuture<Void> future = null; RFuture<Void> future = null;
if (nextStartDate != null) { if (nextStartDate != null) {
RemoteExecutorServiceAsync service = asyncScheduledServiceAtFixed(params.getExecutorId(), params.getRequestId()); RemoteExecutorServiceAsync service = asyncScheduledServiceAtFixed(params.getExecutorId(), params.getRequestId());
params.setStartTime(nextStartDate.getTime()); params.setStartTime(nextStartDate.toInstant().toEpochMilli());
future = service.schedule(params); future = service.schedule(params);
} }
try { try {

@ -161,7 +161,7 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
@Test @Test
public void testDelay() throws InterruptedException { public void testDelay() {
RScheduledExecutorService executor = redisson.getExecutorService("test", ExecutorOptions.defaults().taskRetryInterval(5, TimeUnit.SECONDS)); RScheduledExecutorService executor = redisson.getExecutorService("test", ExecutorOptions.defaults().taskRetryInterval(5, TimeUnit.SECONDS));
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
RScheduledFuture<?> f = executor.schedule(new ScheduledCallableTask(), 11, TimeUnit.SECONDS); RScheduledFuture<?> f = executor.schedule(new ScheduledCallableTask(), 11, TimeUnit.SECONDS);
@ -281,9 +281,9 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
} }
@Test @Test
public void testCronExpression() throws InterruptedException, ExecutionException { public void testCronExpression() throws InterruptedException {
RScheduledExecutorService executor = redisson.getExecutorService("test"); RScheduledExecutorService executor = redisson.getExecutorService("test");
executor.schedule(new ScheduledRunnableTask("executed"), CronSchedule.of("0/2 * * * * ?")); executor.schedule(new ScheduledRunnableTask("executed"), CronSchedule.of("0/2 * * * * ?"));
Thread.sleep(4000); Thread.sleep(4000);
assertThat(redisson.getAtomicLong("executed").get()).isEqualTo(2); assertThat(redisson.getAtomicLong("executed").get()).isEqualTo(2);
} }
@ -307,7 +307,7 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
} }
@Test @Test
public void testWrongCronExpression() throws InterruptedException, ExecutionException { public void testWrongCronExpression() {
Assertions.assertThrows(IllegalArgumentException.class, () -> { Assertions.assertThrows(IllegalArgumentException.class, () -> {
RScheduledExecutorService executor = redisson.getExecutorService("test"); RScheduledExecutorService executor = redisson.getExecutorService("test");
executor.schedule(new ScheduledRunnableTask("executed"), CronSchedule.of("0 44 12 19 JUN ? 2018")); executor.schedule(new ScheduledRunnableTask("executed"), CronSchedule.of("0 44 12 19 JUN ? 2018"));
@ -315,7 +315,7 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
} }
@Test @Test
public void testCronExpressionMultipleTasks() throws InterruptedException, ExecutionException { public void testCronExpressionMultipleTasks() throws InterruptedException {
RScheduledExecutorService executor = redisson.getExecutorService("test", ExecutorOptions.defaults().taskRetryInterval(2, TimeUnit.SECONDS)); RScheduledExecutorService executor = redisson.getExecutorService("test", ExecutorOptions.defaults().taskRetryInterval(2, TimeUnit.SECONDS));
executor.schedule(new ScheduledRunnableTask("executed1"), CronSchedule.of("0/5 * * * * ?")); executor.schedule(new ScheduledRunnableTask("executed1"), CronSchedule.of("0/5 * * * * ?"));
executor.schedule(new ScheduledRunnableTask("executed2"), CronSchedule.of("0/1 * * * * ?")); executor.schedule(new ScheduledRunnableTask("executed2"), CronSchedule.of("0/1 * * * * ?"));
@ -338,7 +338,7 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
} }
@Test @Test
public void testCancel2() throws InterruptedException { public void testCancel2() {
Assertions.assertTimeout(Duration.ofSeconds(15), () -> { Assertions.assertTimeout(Duration.ofSeconds(15), () -> {
RScheduledExecutorService e = redisson.getExecutorService("myExecutor"); RScheduledExecutorService e = redisson.getExecutorService("myExecutor");
e.registerWorkers(WorkerOptions.defaults()); e.registerWorkers(WorkerOptions.defaults());
@ -481,7 +481,7 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
} }
@Test @Test
public void testCancelAtFixedDelay2() throws InterruptedException, ExecutionException { public void testCancelAtFixedDelay2() throws InterruptedException {
RScheduledExecutorService executor = redisson.getExecutorService("test", ExecutorOptions.defaults().taskRetryInterval(30, TimeUnit.MINUTES)); RScheduledExecutorService executor = redisson.getExecutorService("test", ExecutorOptions.defaults().taskRetryInterval(30, TimeUnit.MINUTES));
executor.registerWorkers(WorkerOptions.defaults().workers(5)); executor.registerWorkers(WorkerOptions.defaults().workers(5));
RScheduledFuture<?> future1 = executor.scheduleWithFixedDelay(new ScheduledRunnableTask2("executed1"), 1, 2, TimeUnit.SECONDS); RScheduledFuture<?> future1 = executor.scheduleWithFixedDelay(new ScheduledRunnableTask2("executed1"), 1, 2, TimeUnit.SECONDS);

Loading…
Cancel
Save