Fixed - RScheduledExecutorService.scheduleAtFixedRate() starts multiple instances of the same task if multiple workers defined #3515

pull/3941/head
Nikita Koksharov 4 years ago
parent 60abd72ae9
commit 769f0b1271

@ -126,16 +126,16 @@ public class TasksRunnerService implements RemoteExecutorService {
@Override
public void scheduleAtFixedRate(ScheduledAtFixedRateParameters params) {
long newStartTime = System.currentTimeMillis() + params.getPeriod();
long start = System.nanoTime();
executeRunnable(params, false);
long spent = params.getSpentTime()
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
long newStartTime = System.currentTimeMillis() + Math.max(params.getPeriod() - spent, 0);
params.setStartTime(newStartTime);
RFuture<Void> future = asyncScheduledServiceAtFixed(params.getExecutorId(), params.getRequestId()).scheduleAtFixedRate(params);
try {
executeRunnable(params, false);
} catch (Exception e) {
// cancel task if it throws an exception
future.cancel(true);
throw e;
}
spent = Math.max(spent - params.getPeriod(), 0);
params.setSpentTime(spent);
asyncScheduledServiceAtFixed(params.getExecutorId(), params.getRequestId()).scheduleAtFixedRate(params);
}
@Override

@ -24,7 +24,15 @@ public class ScheduledAtFixedRateParameters extends ScheduledParameters {
private long period;
private String executorId;
private long spentTime;
public long getSpentTime() {
return spentTime;
}
public void setSpentTime(long spentTime) {
this.spentTime = spentTime;
}
public long getPeriod() {
return period;
}

@ -1,15 +1,8 @@
package org.redisson.executor;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import mockit.Invocation;
import mockit.Mock;
import mockit.MockUp;
import org.joor.Reflect;
import org.junit.After;
import org.junit.Assert;
@ -24,9 +17,15 @@ import org.redisson.api.annotation.RInject;
import org.redisson.config.Config;
import org.redisson.config.RedissonNodeConfig;
import mockit.Invocation;
import mockit.Mock;
import mockit.MockUp;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonScheduledExecutorServiceTest extends BaseTest {
@ -38,7 +37,7 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
super.before();
Config config = createConfig();
RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config);
nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test", 1));
nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test", 5));
node = RedissonNode.create(nodeConfig);
node.start();
}
@ -65,6 +64,53 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
}
public static class TestTask2 implements Runnable, Serializable {
@RInject
RedissonClient redisson;
@Override
public void run() {
RList<Object> list = redisson.getList("timelist");
list.add(System.currentTimeMillis());
RAtomicLong counter = redisson.getAtomicLong("counter");
try {
if (counter.get() == 0) {
Thread.sleep(1000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
counter.incrementAndGet();
}
}
@Test
public void testScheduleAtFixedRate() throws InterruptedException {
RScheduledExecutorService executorService = redisson.getExecutorService("test");
executorService.scheduleAtFixedRate(new TestTask2(), 1000L, 200L, TimeUnit.MILLISECONDS);
Thread.sleep(4000);
RList<Long> list = redisson.getList("timelist");
long start = list.get(1);
list.stream().skip(1).limit(5).reduce(start, (r, e) -> {
assertThat(e - r).isLessThan(20L);
return e;
});
long start2 = list.get(5);
list.stream().skip(6).limit(15).reduce(start2, (r, e) -> {
assertThat(e - r).isBetween(160L, 310L);
return e;
});
}
@Test
public void testTTL() throws InterruptedException {
RScheduledExecutorService executor = redisson.getExecutorService("test");

Loading…
Cancel
Save