Fixed - running scheduleWithFixedDelay Job couldn't be canceled. #1869

pull/1871/head
Nikita Koksharov 6 years ago
parent 96ef05671e
commit 0dcefca39c

@ -146,7 +146,7 @@ public class TasksRunnerService implements RemoteExecutorService {
future = service.schedule(params); future = service.schedule(params);
} }
try { try {
executeRunnable(params, nextStartDate); executeRunnable(params, nextStartDate == null);
} catch (RuntimeException e) { } catch (RuntimeException e) {
// cancel task if it throws an exception // cancel task if it throws an exception
if (future != null) { if (future != null) {
@ -178,7 +178,11 @@ public class TasksRunnerService implements RemoteExecutorService {
@Override @Override
public void scheduleWithFixedDelay(ScheduledWithFixedDelayParameters params) { public void scheduleWithFixedDelay(ScheduledWithFixedDelayParameters params) {
executeRunnable(params); executeRunnable(params, false);
if (!redisson.getMap(tasksName, StringCodec.INSTANCE).containsKey(params.getRequestId())) {
return;
}
long newStartTime = System.currentTimeMillis() + params.getDelay(); long newStartTime = System.currentTimeMillis() + params.getDelay();
params.setStartTime(newStartTime); params.setStartTime(newStartTime);
asyncScheduledServiceAtFixed(params.getExecutorId(), params.getRequestId()).scheduleWithFixedDelay(params); asyncScheduledServiceAtFixed(params.getExecutorId(), params.getRequestId()).scheduleWithFixedDelay(params);
@ -209,7 +213,7 @@ public class TasksRunnerService implements RemoteExecutorService {
} catch (Exception e) { } catch (Exception e) {
throw new IllegalArgumentException(e); throw new IllegalArgumentException(e);
} finally { } finally {
finish(params.getRequestId(), null); finish(params.getRequestId(), true);
} }
} }
@ -293,7 +297,7 @@ public class TasksRunnerService implements RemoteExecutorService {
} }
} }
public void executeRunnable(TaskParameters params, Date nextDate) { public void executeRunnable(TaskParameters params, boolean removeTask) {
if (params.getRequestId() != null && params.getRequestId().startsWith("00")) { if (params.getRequestId() != null && params.getRequestId().startsWith("00")) {
renewRetryTime(params.getRequestId()); renewRetryTime(params.getRequestId());
} }
@ -308,13 +312,13 @@ public class TasksRunnerService implements RemoteExecutorService {
} catch (Exception e) { } catch (Exception e) {
throw new IllegalArgumentException(e); throw new IllegalArgumentException(e);
} finally { } finally {
finish(params.getRequestId(), nextDate); finish(params.getRequestId(), removeTask);
} }
} }
@Override @Override
public void executeRunnable(TaskParameters params) { public void executeRunnable(TaskParameters params) {
executeRunnable(params, null); executeRunnable(params, true);
} }
/** /**
@ -327,9 +331,9 @@ public class TasksRunnerService implements RemoteExecutorService {
* *
* @param requestId * @param requestId
*/ */
private void finish(String requestId, Date nextDate) { private void finish(String requestId, boolean removeTask) {
String script = ""; String script = "";
if (nextDate == null) { if (removeTask) {
script += "local scheduled = redis.call('zscore', KEYS[5], ARGV[3]);" script += "local scheduled = redis.call('zscore', KEYS[5], ARGV[3]);"
+ "if scheduled == false then " + "if scheduled == false then "
+ "redis.call('hdel', KEYS[4], ARGV[3]); " + "redis.call('hdel', KEYS[4], ARGV[3]); "
@ -345,7 +349,7 @@ public class TasksRunnerService implements RemoteExecutorService {
+ "end;" + "end;"
+ "end;"; + "end;";
commandExecutor.evalWriteAsync(name, StringCodec.INSTANCE, RedisCommands.EVAL_VOID, commandExecutor.evalWrite(name, StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
script, script,
Arrays.<Object>asList(tasksCounterName, statusName, terminationTopicName, tasksName, schedulerQueueName, tasksRetryIntervalName), Arrays.<Object>asList(tasksCounterName, statusName, terminationTopicName, tasksName, schedulerQueueName, tasksRetryIntervalName),
RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE, requestId); RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE, requestId);

@ -345,6 +345,42 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
} }
} }
public static class ScheduledRunnableTask2 implements Runnable, Serializable {
private static final long serialVersionUID = -3523561767248576192L;
private String key;
@RInject
private RedissonClient redisson;
public ScheduledRunnableTask2(String key) {
this.key = key;
}
@Override
public void run() {
System.out.println("job is running");
try {
redisson.getAtomicLong(key).incrementAndGet();
Thread.sleep(15000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("job is over");
}
}
@Test
public void testCancelAtFixedDelay2() throws InterruptedException, ExecutionException {
RScheduledExecutorService executor = redisson.getExecutorService("test", ExecutorOptions.defaults().taskRetryInterval(30, TimeUnit.MINUTES));
executor.registerWorkers(5);
RScheduledFuture<?> future1 = executor.scheduleWithFixedDelay(new ScheduledRunnableTask2("executed1"), 1, 2, TimeUnit.SECONDS);
Thread.sleep(5000);
assertThat(redisson.getAtomicLong("executed1").get()).isEqualTo(1);
assertThat(executor.cancelTask(future1.getTaskId())).isTrue();
Thread.sleep(30000);
assertThat(redisson.getAtomicLong("executed1").get()).isEqualTo(1);
}
@Test @Test
public void testCancelAtFixedRate() throws InterruptedException, ExecutionException { public void testCancelAtFixedRate() throws InterruptedException, ExecutionException {

Loading…
Cancel
Save