|
|
|
@ -21,13 +21,17 @@ import org.junit.After;
|
|
|
|
|
import org.junit.Before;
|
|
|
|
|
import org.junit.Test;
|
|
|
|
|
import org.redisson.BaseTest;
|
|
|
|
|
import org.redisson.RedisRunner;
|
|
|
|
|
import org.redisson.Redisson;
|
|
|
|
|
import org.redisson.RedissonNode;
|
|
|
|
|
import org.redisson.api.ExecutorOptions;
|
|
|
|
|
import org.redisson.api.RExecutorBatchFuture;
|
|
|
|
|
import org.redisson.api.RExecutorFuture;
|
|
|
|
|
import org.redisson.api.RExecutorService;
|
|
|
|
|
import org.redisson.api.RedissonClient;
|
|
|
|
|
import org.redisson.config.Config;
|
|
|
|
|
import org.redisson.config.RedissonNodeConfig;
|
|
|
|
|
import org.redisson.connection.balancer.RandomLoadBalancer;
|
|
|
|
|
|
|
|
|
|
import mockit.Invocation;
|
|
|
|
|
import mockit.Mock;
|
|
|
|
@ -127,7 +131,96 @@ public class RedissonExecutorServiceTest extends BaseTest {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void testTaskFailover() throws Exception {
|
|
|
|
|
public void testFailoverInSentinel() throws Exception {
|
|
|
|
|
RedisRunner.RedisProcess master = new RedisRunner()
|
|
|
|
|
.nosave()
|
|
|
|
|
.randomDir()
|
|
|
|
|
.run();
|
|
|
|
|
RedisRunner.RedisProcess slave1 = new RedisRunner()
|
|
|
|
|
.port(6380)
|
|
|
|
|
.nosave()
|
|
|
|
|
.randomDir()
|
|
|
|
|
.slaveof("127.0.0.1", 6379)
|
|
|
|
|
.run();
|
|
|
|
|
RedisRunner.RedisProcess slave2 = new RedisRunner()
|
|
|
|
|
.port(6381)
|
|
|
|
|
.nosave()
|
|
|
|
|
.randomDir()
|
|
|
|
|
.slaveof("127.0.0.1", 6379)
|
|
|
|
|
.run();
|
|
|
|
|
RedisRunner.RedisProcess sentinel1 = new RedisRunner()
|
|
|
|
|
.nosave()
|
|
|
|
|
.randomDir()
|
|
|
|
|
.port(26379)
|
|
|
|
|
.sentinel()
|
|
|
|
|
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
|
|
|
|
|
.run();
|
|
|
|
|
RedisRunner.RedisProcess sentinel2 = new RedisRunner()
|
|
|
|
|
.nosave()
|
|
|
|
|
.randomDir()
|
|
|
|
|
.port(26380)
|
|
|
|
|
.sentinel()
|
|
|
|
|
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
|
|
|
|
|
.run();
|
|
|
|
|
RedisRunner.RedisProcess sentinel3 = new RedisRunner()
|
|
|
|
|
.nosave()
|
|
|
|
|
.randomDir()
|
|
|
|
|
.port(26381)
|
|
|
|
|
.sentinel()
|
|
|
|
|
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
|
|
|
|
|
.run();
|
|
|
|
|
|
|
|
|
|
Thread.sleep(5000);
|
|
|
|
|
|
|
|
|
|
Config config = new Config();
|
|
|
|
|
config.useSentinelServers()
|
|
|
|
|
.setLoadBalancer(new RandomLoadBalancer())
|
|
|
|
|
.addSentinelAddress(sentinel3.getRedisServerAddressAndPort()).setMasterName("myMaster");
|
|
|
|
|
|
|
|
|
|
RedissonClient redisson = Redisson.create(config);
|
|
|
|
|
|
|
|
|
|
RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config);
|
|
|
|
|
nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test2", 1));
|
|
|
|
|
node = RedissonNode.create(nodeConfig);
|
|
|
|
|
node.start();
|
|
|
|
|
|
|
|
|
|
RExecutorService executor = redisson.getExecutorService("test2", ExecutorOptions.defaults().taskRetryInterval(10, TimeUnit.SECONDS));
|
|
|
|
|
for (int i = 0; i < 10; i++) {
|
|
|
|
|
executor.submit(new DelayedTask(2000, "counter"));
|
|
|
|
|
}
|
|
|
|
|
Thread.sleep(2500);
|
|
|
|
|
assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(1);
|
|
|
|
|
|
|
|
|
|
master.stop();
|
|
|
|
|
System.out.println("master " + master.getRedisServerAddressAndPort() + " stopped!");
|
|
|
|
|
|
|
|
|
|
Thread.sleep(TimeUnit.SECONDS.toMillis(70));
|
|
|
|
|
|
|
|
|
|
master = new RedisRunner()
|
|
|
|
|
.port(master.getRedisServerPort())
|
|
|
|
|
.nosave()
|
|
|
|
|
.randomDir()
|
|
|
|
|
.run();
|
|
|
|
|
|
|
|
|
|
System.out.println("master " + master.getRedisServerAddressAndPort() + " started!");
|
|
|
|
|
|
|
|
|
|
Thread.sleep(25000);
|
|
|
|
|
|
|
|
|
|
assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(10);
|
|
|
|
|
|
|
|
|
|
redisson.shutdown();
|
|
|
|
|
node.shutdown();
|
|
|
|
|
sentinel1.stop();
|
|
|
|
|
sentinel2.stop();
|
|
|
|
|
sentinel3.stop();
|
|
|
|
|
master.stop();
|
|
|
|
|
slave1.stop();
|
|
|
|
|
slave2.stop();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void testNodeFailover() throws Exception {
|
|
|
|
|
AtomicInteger counter = new AtomicInteger();
|
|
|
|
|
new MockUp<TasksRunnerService>() {
|
|
|
|
|
@Mock
|
|
|
|
|