Merge branch 'master' of github.com:redisson/redisson

pull/5186/head
Nikita Koksharov 2 years ago
commit d9c9d66da0

@ -56,7 +56,7 @@
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>2.1.210</version>
<version>2.2.220</version>
<scope>test</scope>
</dependency>

@ -139,21 +139,15 @@ public class TasksRunnerService implements RemoteExecutorService {
CronExpression expression = new CronExpression(params.getCronExpression());
expression.setTimeZone(TimeZone.getTimeZone(params.getTimezone()));
Date nextStartDate = expression.getNextValidTimeAfter(new Date());
RFuture<Void> future = null;
if (nextStartDate != null) {
RemoteExecutorServiceAsync service = asyncScheduledServiceAtFixed(params.getExecutorId(), params.getRequestId());
params.setStartTime(nextStartDate.getTime());
future = service.schedule(params);
}
try {
executeRunnable(params, nextStartDate == null);
} catch (Exception e) {
// cancel task if it throws an exception
if (future != null) {
future.cancel(true);
}
throw e;
if (nextStartDate == null || !redisson.getMap(tasksName, StringCodec.INSTANCE).containsKey(params.getRequestId())) {
return;
}
params.setStartTime(nextStartDate.getTime());
asyncScheduledServiceAtFixed(params.getExecutorId(), params.getRequestId()).schedule(params);
}
/**

@ -392,6 +392,42 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
assertThat(redisson.getAtomicLong("executed2").get()).isEqualTo(30);
}
@Test
public void testCancelCronExpression() throws InterruptedException, ExecutionException {
RScheduledExecutorService executor = redisson.getExecutorService("test");
RScheduledFuture<?> future = executor.schedule(new ScheduledRunnableTask("executed"), CronSchedule.of("0/2 * * * * ?"));
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
assertThat(redisson.getAtomicLong("executed").get()).isEqualTo(5);
cancel(future);
Thread.sleep(TimeUnit.SECONDS.toMillis(3));
assertThat(redisson.getAtomicLong("executed").get()).isEqualTo(5);
executor.delete();
redisson.getKeys().delete("executed");
assertThat(redisson.getKeys().count()).isZero();
}
@Test
public void testCancelAndInterruptCronExpression() throws InterruptedException, ExecutionException {
RScheduledExecutorService executor = redisson.getExecutorService("test");
RScheduledFuture<?> future = executor.schedule(new ScheduledLongRepeatableTask("counter", "executed"), CronSchedule.of("0/2 * * * * ?"));
Thread.sleep(TimeUnit.SECONDS.toMillis(6));
assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(3);
cancel(future);
Thread.sleep(50);
assertThat(redisson.<Long>getBucket("executed").get()).isGreaterThan(1000L);
Thread.sleep(TimeUnit.SECONDS.toMillis(3));
assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(3);
executor.delete();
redisson.getKeys().delete("counter", "executed");
assertThat(redisson.getKeys().count()).isZero();
}
public static class RunnableTask2 implements Runnable, Serializable {
@Override

Loading…
Cancel
Save