diff --git a/redisson/src/test/java/org/redisson/RedisClientTest.java b/redisson/src/test/java/org/redisson/RedisClientTest.java index 17993c3ed..95a818a70 100644 --- a/redisson/src/test/java/org/redisson/RedisClientTest.java +++ b/redisson/src/test/java/org/redisson/RedisClientTest.java @@ -12,9 +12,6 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisStrictCommand; import org.redisson.client.protocol.pubsub.PubSubType; import org.redisson.config.Protocol; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; import java.util.ArrayList; import java.util.Arrays; @@ -25,21 +22,15 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; -@Testcontainers public class RedisClientTest { - @Container - private static final GenericContainer REDIS = - new GenericContainer<>("redis:7.2") - .withExposedPorts(6379); - private static RedisClient redisClient; @BeforeAll public static void beforeAll() { RedisClientConfig config = new RedisClientConfig(); config.setProtocol(Protocol.RESP3); - config.setAddress("redis://127.0.0.1:" + REDIS.getFirstMappedPort()); + config.setAddress("redis://127.0.0.1:" + RedisDockerTest.REDIS.getFirstMappedPort()); redisClient = RedisClient.create(config); } diff --git a/redisson/src/test/java/org/redisson/RedisDockerTest.java b/redisson/src/test/java/org/redisson/RedisDockerTest.java index 1ee532de1..2f2b15a62 100644 --- a/redisson/src/test/java/org/redisson/RedisDockerTest.java +++ b/redisson/src/test/java/org/redisson/RedisDockerTest.java @@ -5,7 +5,6 @@ import com.github.dockerjava.api.model.ContainerNetwork; import com.github.dockerjava.api.model.ExposedPort; import com.github.dockerjava.api.model.PortBinding; import com.github.dockerjava.api.model.Ports; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.redisson.api.NatMapper; import org.redisson.api.RedissonClient; @@ -20,10 +19,12 @@ import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupC import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; import java.io.File; +import java.io.IOException; import java.time.Duration; import java.util.*; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.stream.Collectors; public class RedisDockerTest { @@ -453,7 +454,7 @@ public class RedisDockerTest { network.close(); } - protected void withNewCluster(Consumer callback) { + protected void withNewCluster(BiConsumer, RedissonClient> callback) { LogMessageWaitStrategy wait2 = new LogMessageWaitStrategy().withRegEx(".*REPLICA\ssync\\:\sFinished\swith\ssuccess.*"); @@ -515,13 +516,53 @@ public class RedisDockerTest { RedissonClient redisson = Redisson.create(config); try { - callback.accept(redisson); + callback.accept(nodes, redisson); } finally { redisson.shutdown(); environment.stop(); } } + protected String execute(ContainerState node, String... commands) { + try { + return node.execInContainer(commands).getStdout(); + } catch (IOException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + protected List getSlaveNodes(List nodes) { + return nodes.stream().filter(node -> { + if (!node.isRunning()) { + return false; + } + String r = execute(node, "redis-cli", "info", "replication"); + if (r.contains("role:slave")) { + return true; + } + return false; + }).collect(Collectors.toList()); + } + + protected List getMasterNodes(List nodes) { + return nodes.stream().filter(node -> { + if (!node.isRunning()) { + return false; + } + String r = execute(node, "redis-cli", "info", "replication"); + if (r.contains("role:master")) { + return true; + } + return false; + }).collect(Collectors.toList()); + } + + protected void stop(ContainerState node) { + execute(node, "redis-cli", "shutdown"); + } + protected void restart(GenericContainer redis) { redis.setPortBindings(Arrays.asList(redis.getFirstMappedPort() + ":" + redis.getExposedPorts().get(0))); redis.stop(); diff --git a/redisson/src/test/java/org/redisson/RedissonBatchTest.java b/redisson/src/test/java/org/redisson/RedissonBatchTest.java index cbdbd91ea..9b8a132c9 100644 --- a/redisson/src/test/java/org/redisson/RedissonBatchTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBatchTest.java @@ -43,7 +43,7 @@ public class RedissonBatchTest extends RedisDockerTest { @ParameterizedTest @MethodSource("data") public void testSlotMigrationInCluster(BatchOptions batchOptions) { - withNewCluster(redissonClient -> { + withNewCluster((nodes, redissonClient) -> { Config config = redissonClient.getConfig(); config.useClusterServers() .setScanInterval(1000) diff --git a/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java b/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java index 4da315c37..2b268e494 100644 --- a/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java @@ -8,13 +8,8 @@ import org.redisson.api.Entry; import org.redisson.api.RBlockingQueue; import org.redisson.api.RFuture; import org.redisson.api.RedissonClient; -import org.redisson.api.redisnode.RedisCluster; -import org.redisson.api.redisnode.RedisClusterMaster; -import org.redisson.api.redisnode.RedisNodes; -import org.redisson.client.RedisClient; -import org.redisson.client.RedisClientConfig; -import org.redisson.client.protocol.RedisCommands; import org.redisson.config.Config; +import org.testcontainers.containers.ContainerState; import org.testcontainers.containers.GenericContainer; import java.io.IOException; @@ -154,7 +149,7 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest { @Test public void testTakeReattachCluster() { - withNewCluster(redisson -> { + withNewCluster((nodes, redisson) -> { List> futures = new ArrayList<>(); for (int i = 0; i < 10; i++) { RBlockingQueue queue = redisson.getBlockingQueue("queue" + i); @@ -168,14 +163,8 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest { throw new RuntimeException(e); } - RedisCluster rnc = redisson.getRedisNodes(RedisNodes.CLUSTER); - Optional ff = rnc.getMasters().stream().findFirst(); - RedisClusterMaster master = ff.get(); - RedisClientConfig cc = new RedisClientConfig(); - cc.setAddress("redis://" + master.getAddr().getHostString() + ":" + master.getAddr().getPort()); - RedisClient c = RedisClient.create(cc); - c.connect().async(RedisCommands.SHUTDOWN); - c.shutdown(); + List masters = getMasterNodes(nodes); + stop(masters.get(0)); try { Thread.sleep(TimeUnit.SECONDS.toMillis(30)); @@ -186,7 +175,7 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest { for (int i = 0; i < 10; i++) { RBlockingQueue queue = redisson.getBlockingQueue("queue" + i); try { - queue.put(i*100); + queue.put(i * 100); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -200,7 +189,7 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest { // skip } Integer result = f.toCompletableFuture().getNow(null); - assertThat(result).isEqualTo(i*100); + assertThat(result).isEqualTo(i * 100); } redisson.shutdown(); diff --git a/redisson/src/test/java/org/redisson/RedissonFailoverTest.java b/redisson/src/test/java/org/redisson/RedissonFailoverTest.java index 28dad113a..c2b5f1290 100644 --- a/redisson/src/test/java/org/redisson/RedissonFailoverTest.java +++ b/redisson/src/test/java/org/redisson/RedissonFailoverTest.java @@ -1,6 +1,5 @@ package org.redisson; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.redisson.api.RFuture; import org.redisson.api.RedissonClient; @@ -9,7 +8,8 @@ import org.redisson.api.redisnode.RedisNodes; import org.redisson.config.Config; import org.redisson.config.ReadMode; import org.redisson.config.SubscriptionMode; -import org.redisson.connection.balancer.RandomLoadBalancer; +import org.testcontainers.containers.ContainerState; +import org.testcontainers.containers.GenericContainer; import java.net.InetSocketAddress; import java.util.*; @@ -17,380 +17,294 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import static org.assertj.core.api.Assertions.*; +import static org.assertj.core.api.Assertions.assertThat; -public class RedissonFailoverTest { +public class RedissonFailoverTest extends RedisDockerTest { @Test - public void testFailoverInCluster() throws Exception { - RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave(); - RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave(); - RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave(); - RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave(); - RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave(); - RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave(); - RedisRunner slave4 = new RedisRunner().port(6903).randomDir().nosave(); - - ClusterRunner clusterRunner = new ClusterRunner() - .addNode(master1, slave1, slave4) - .addNode(master2, slave2) - .addNode(master3, slave3); - ClusterRunner.ClusterProcesses process = clusterRunner.run(); - - Thread.sleep(7000); - - Config config = new Config(); - config.useClusterServers() - .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); - RedissonClient redisson = Redisson.create(config); - - RedisRunner.RedisProcess master = process.getNodes().stream().filter(x -> x.getRedisServerPort() == master1.getPort()).findFirst().get(); - - List> futures = new ArrayList>(); - CountDownLatch latch = new CountDownLatch(1); - Thread t = new Thread() { - public void run() { - for (int i = 0; i < 2000; i++) { - RFuture f1 = redisson.getBucket("i" + i).getAsync(); - RFuture f2 = redisson.getBucket("i" + i).setAsync(""); - RFuture f3 = redisson.getTopic("topic").publishAsync("testmsg"); - futures.add(f1); - futures.add(f2); - futures.add(f3); - try { - Thread.sleep(100); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); + public void testFailoverInCluster() { + withNewCluster((nodes, redisson) -> { + List masters = getMasterNodes(nodes); + + List> futures = new ArrayList<>(); + CountDownLatch latch = new CountDownLatch(1); + Thread t = new Thread() { + public void run() { + for (int i = 0; i < 2000; i++) { + RFuture f1 = redisson.getBucket("i" + i).getAsync(); + RFuture f2 = redisson.getBucket("i" + i).setAsync(""); + RFuture f3 = redisson.getTopic("topic").publishAsync("testmsg"); + futures.add(f1); + futures.add(f2); + futures.add(f3); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + if (i % 100 == 0) { + System.out.println("step: " + i); + } } - if (i % 100 == 0) { - System.out.println("step: " + i); - } - } - latch.countDown(); + latch.countDown(); + }; }; - }; - t.start(); - t.join(1000); - - Set oldMasters = new HashSet<>(); - Collection masterNodes = redisson.getRedisNodes(org.redisson.api.redisnode.RedisNodes.CLUSTER).getMasters(); - for (RedisClusterMaster clusterNode : masterNodes) { - oldMasters.add(clusterNode.getAddr()); - } - - master.stop(); - System.out.println("master " + master.getRedisServerAddressAndPort() + " has been stopped!"); - - TimeUnit.SECONDS.sleep(15); - - RedisRunner.RedisProcess newMaster = null; - Collection newMasterNodes = redisson.getRedisNodes(RedisNodes.CLUSTER).getMasters(); - for (RedisClusterMaster clusterNode : newMasterNodes) { - if (!oldMasters.contains(clusterNode.getAddr())) { - newMaster = process.getNodes().stream().filter(x -> x.getRedisServerPort() == clusterNode.getAddr().getPort()).findFirst().get(); - break; + t.start(); + try { + t.join(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); } - } - assertThat(newMaster).isNotNull(); + stop(masters.get(0)); + System.out.println("master " + masters.get(0).getFirstMappedPort() + " has been stopped!"); - TimeUnit.SECONDS.sleep(30); - - newMaster.stop(); + try { + TimeUnit.SECONDS.sleep(25); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } - System.out.println("new master " + newMaster.getRedisServerAddressAndPort() + " has been stopped!"); + List newMasters = getMasterNodes(nodes); + newMasters.removeAll(masters); + assertThat(newMasters).hasSize(1); - assertThat(latch.await(180, TimeUnit.SECONDS)).isTrue(); + try { + TimeUnit.SECONDS.sleep(30); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } - int errors = 0; - int success = 0; - int readonlyErrors = 0; + stop(newMasters.get(0)); + System.out.println("new master " + newMasters.get(0).getFirstMappedPort() + " has been stopped!"); - for (RFuture rFuture : futures) { try { - rFuture.toCompletableFuture().join(); - success++; - } catch (Exception e) { - errors++; + assertThat(latch.await(180, TimeUnit.SECONDS)).isTrue(); + } catch (InterruptedException e) { + throw new RuntimeException(e); } - } - redisson.shutdown(); - process.shutdown(); + int errors = 0; + int success = 0; + int readonlyErrors = 0; + + for (RFuture rFuture : futures) { + try { + rFuture.toCompletableFuture().join(); + success++; + } catch (Exception e) { + errors++; + } + } - assertThat(readonlyErrors).isZero(); - assertThat(errors).isLessThan(1000); - assertThat(success).isGreaterThan(5000); + assertThat(readonlyErrors).isZero(); + assertThat(errors).isLessThan(2900); + assertThat(success).isGreaterThan(3000); + }); } @Test - public void testFailoverInClusterSlave() throws Exception { - RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave(); - RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave(); - RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave(); - RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave(); - RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave(); - RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave(); - - ClusterRunner clusterRunner = new ClusterRunner() - .addNode(master1, slave1) - .addNode(master2, slave2) - .addNode(master3, slave3); - ClusterRunner.ClusterProcesses process = clusterRunner.run(); - - Thread.sleep(7000); - - Config config = new Config(); - config.useClusterServers() - .setLoadBalancer(new RandomLoadBalancer()) - .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); - RedissonClient redisson = Redisson.create(config); - - RedisRunner.RedisProcess slave = process.getNodes().stream().filter(x -> x.getRedisServerPort() == slave1.getPort()).findFirst().get(); - - List> futures = new ArrayList>(); - CountDownLatch latch = new CountDownLatch(1); - Thread t = new Thread() { - public void run() { - for (int i = 0; i < 600; i++) { - RFuture f1 = redisson.getBucket("i" + i).getAsync(); - futures.add(f1); - try { - Thread.sleep(100); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); + public void testFailoverInClusterSlave() { + withNewCluster((nodes, redisson) -> { + ContainerState slave = getSlaveNodes(nodes).get(0); + + List> futures = new ArrayList>(); + CountDownLatch latch = new CountDownLatch(1); + Thread t = new Thread() { + public void run() { + for (int i = 0; i < 600; i++) { + RFuture f1 = redisson.getBucket("i" + i).getAsync(); + futures.add(f1); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + if (i % 100 == 0) { + System.out.println("step: " + i); + } } - if (i % 100 == 0) { - System.out.println("step: " + i); - } - } - latch.countDown(); + latch.countDown(); + }; }; - }; - t.start(); - t.join(1000); - - slave.restart(20); - System.out.println("slave " + slave.getRedisServerAddressAndPort() + " has been stopped!"); - - assertThat(latch.await(70, TimeUnit.SECONDS)).isTrue(); + t.start(); + try { + t.join(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } - int errors = 0; - int success = 0; - int readonlyErrors = 0; + slave.getDockerClient().pauseContainerCmd(slave.getContainerId()).exec(); + System.out.println("slave " + slave.getFirstMappedPort() + " has been stopped!"); + try { + TimeUnit.SECONDS.sleep(20); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + slave.getDockerClient().unpauseContainerCmd(slave.getContainerId()).exec(); - for (RFuture rFuture : futures) { try { - rFuture.toCompletableFuture().join(); - success++; - } catch (Exception e) { - e.printStackTrace(); - errors++; - // skip + assertThat(latch.await(70, TimeUnit.SECONDS)).isTrue(); + } catch (InterruptedException e) { + throw new RuntimeException(e); } - } - redisson.shutdown(); - process.shutdown(); + int errors = 0; + int success = 0; + int readonlyErrors = 0; + + for (RFuture rFuture : futures) { + try { + rFuture.toCompletableFuture().join(); + success++; + } catch (Exception e) { + errors++; + // skip + } + } - assertThat(readonlyErrors).isZero(); - assertThat(errors).isLessThan(200); - assertThat(success).isGreaterThan(600 - 200); - assertThat(futures.get(futures.size() - 1).isDone()).isTrue(); - assertThat(futures.get(futures.size() - 1).toCompletableFuture().isCompletedExceptionally()).isFalse(); + assertThat(readonlyErrors).isZero(); + assertThat(errors).isBetween(15, 200); + assertThat(success).isGreaterThan(600 - 200); + assertThat(futures.get(futures.size() - 1).isDone()).isTrue(); + assertThat(futures.get(futures.size() - 1).toCompletableFuture().isCompletedExceptionally()).isFalse(); + }); } @Test public void testFailoverInSentinel() throws Exception { - RedisRunner.RedisProcess master = new RedisRunner() - .nosave() - .randomDir() - .run(); - RedisRunner.RedisProcess slave1 = new RedisRunner() - .port(6380) - .nosave() - .randomDir() - .slaveof("127.0.0.1", 6379) - .run(); - RedisRunner.RedisProcess slave2 = new RedisRunner() - .port(6381) - .nosave() - .randomDir() - .slaveof("127.0.0.1", 6379) - .run(); - RedisRunner.RedisProcess sentinel1 = new RedisRunner() - .nosave() - .randomDir() - .port(26379) - .sentinel() - .sentinelMonitor("myMaster", "127.0.0.1", 6379, 2) - .run(); - RedisRunner.RedisProcess sentinel2 = new RedisRunner() - .nosave() - .randomDir() - .port(26380) - .sentinel() - .sentinelMonitor("myMaster", "127.0.0.1", 6379, 2) - .run(); - RedisRunner.RedisProcess sentinel3 = new RedisRunner() - .nosave() - .randomDir() - .port(26381) - .sentinel() - .sentinelMonitor("myMaster", "127.0.0.1", 6379, 2) - .run(); - - Thread.sleep(5000); - - Config config = new Config(); - config.useSentinelServers() - .setLoadBalancer(new RandomLoadBalancer()) - .addSentinelAddress(sentinel3.getRedisServerAddressAndPort()).setMasterName("myMaster"); - RedissonClient redisson = Redisson.create(config); - - List> futures = new ArrayList>(); - CountDownLatch latch = new CountDownLatch(1); - Thread t = new Thread() { - public void run() { - for (int i = 0; i < 1000; i++) { - RFuture f1 = redisson.getBucket("i" + i).getAsync(); - RFuture f2 = redisson.getBucket("i" + i).setAsync(""); - RFuture f3 = redisson.getTopic("topic").publishAsync("testmsg"); - futures.add(f1); - futures.add(f2); - futures.add(f3); - try { - Thread.sleep(100); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); + withSentinel((nodes, config) -> { + RedissonClient redisson = Redisson.create(config); + + List> futures = new ArrayList>(); + CountDownLatch latch = new CountDownLatch(1); + Thread t = new Thread() { + public void run() { + for (int i = 0; i < 1000; i++) { + RFuture f1 = redisson.getBucket("i" + i).getAsync(); + RFuture f2 = redisson.getBucket("i" + i).setAsync(""); + RFuture f3 = redisson.getTopic("topic").publishAsync("testmsg"); + futures.add(f1); + futures.add(f2); + futures.add(f3); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } } - } - latch.countDown(); + latch.countDown(); + }; }; - }; - t.start(); - t.join(1000); - - master.stop(); - System.out.println("master " + master.getRedisServerAddressAndPort() + " stopped!"); - - Thread.sleep(TimeUnit.SECONDS.toMillis(70)); - - master = new RedisRunner() - .port(master.getRedisServerPort()) - .nosave() - .randomDir() - .run(); + t.start(); + try { + t.join(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } - System.out.println("master " + master.getRedisServerAddressAndPort() + " started!"); + GenericContainer master = nodes.get(0); + master.setPortBindings(Arrays.asList(master.getFirstMappedPort() + ":" + master.getExposedPorts().get(0))); + master.stop(); + System.out.println("master has been stopped!"); + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(70)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } - Thread.sleep(15000); + master.start(); + System.out.println("master has been started!"); - assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue(); - int errors = 0; - int success = 0; - int readonlyErrors = 0; + try { + Thread.sleep(15000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } - for (RFuture rFuture : futures) { try { - rFuture.toCompletableFuture().join(); - success++; - } catch (CompletionException e) { - if (e.getCause().getMessage().contains("READONLY You can't write against")) { - readonlyErrors++; + assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + int errors = 0; + int success = 0; + int readonlyErrors = 0; + + for (RFuture rFuture : futures) { + try { + rFuture.toCompletableFuture().join(); + success++; + } catch (CompletionException e) { + if (e.getCause().getMessage().contains("READONLY You can't write against")) { + readonlyErrors++; + } + errors++; + // skip } - errors++; - // skip } - } - - System.out.println("errors " + errors + " success " + success + " readonly " + readonlyErrors); - - assertThat(futures.get(futures.size() - 1).isDone()).isTrue(); - assertThat(futures.get(futures.size() - 1).toCompletableFuture().isCompletedExceptionally()).isFalse(); - assertThat(errors).isLessThan(820); - assertThat(readonlyErrors).isZero(); - - redisson.shutdown(); - sentinel1.stop(); - sentinel2.stop(); - sentinel3.stop(); - master.stop(); - slave1.stop(); - slave2.stop(); + + System.out.println("errors " + errors + " success " + success + " readonly " + readonlyErrors); + + assertThat(futures.get(futures.size() - 1).isDone()).isTrue(); + assertThat(futures.get(futures.size() - 1).toCompletableFuture().isCompletedExceptionally()).isFalse(); + assertThat(errors).isBetween(150, 820); + assertThat(readonlyErrors).isZero(); + + redisson.shutdown(); + }, 2); } @Test - public void testFailoverWithoutErrorsInCluster() throws Exception { - RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave(); - RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave(); - RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave(); - RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave(); - RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave(); - RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave(); - - ClusterRunner clusterRunner = new ClusterRunner() - .addNode(master1, slave1) - .addNode(master2, slave2) - .addNode(master3, slave3); - ClusterRunner.ClusterProcesses process = clusterRunner.run(); - - Config config = new Config(); - config.useClusterServers() - .setRetryAttempts(30) - .setReadMode(ReadMode.MASTER) - .setSubscriptionMode(SubscriptionMode.MASTER) - .setLoadBalancer(new RandomLoadBalancer()) - .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); - RedissonClient redisson = Redisson.create(config); - - RedisRunner.RedisProcess master = process.getNodes().stream().filter(x -> x.getRedisServerPort() == master1.getPort()).findFirst().get(); - - List> futures = new ArrayList>(); - - Set oldMasters = new HashSet<>(); - Collection masterNodes = redisson.getRedisNodes(RedisNodes.CLUSTER).getMasters(); - for (RedisClusterMaster clusterNode : masterNodes) { - oldMasters.add(clusterNode.getAddr()); - } - - master.stop(); - - for (int j = 0; j < 2000; j++) { - RFuture f2 = redisson.getBucket("" + j).setAsync(""); - futures.add(f2); - } - - System.out.println("master " + master.getRedisServerAddressAndPort() + " has been stopped!"); - - Thread.sleep(TimeUnit.SECONDS.toMillis(40)); - - RedisRunner.RedisProcess newMaster = null; - Collection newMasterNodes = redisson.getRedisNodes(RedisNodes.CLUSTER).getMasters(); - for (RedisClusterMaster clusterNode : newMasterNodes) { - if (!oldMasters.contains(clusterNode.getAddr())) { - newMaster = process.getNodes().stream().filter(x -> x.getRedisServerPort() == clusterNode.getAddr().getPort()).findFirst().get(); - break; + public void testFailoverWithoutErrorsInCluster() { + withNewCluster((nodes, redissonClient) -> { + Config config = redissonClient.getConfig(); + config.useClusterServers() + .setRetryAttempts(30) + .setReadMode(ReadMode.MASTER) + .setSubscriptionMode(SubscriptionMode.MASTER); + RedissonClient redisson = Redisson.create(config); + + List masters = getMasterNodes(nodes); + + Set oldMasters = new HashSet<>(); + Collection masterNodes = redisson.getRedisNodes(RedisNodes.CLUSTER).getMasters(); + for (RedisClusterMaster clusterNode : masterNodes) { + oldMasters.add(clusterNode.getAddr()); } - } - assertThat(newMaster).isNotNull(); + stop(masters.get(0)); + + List> futures = new ArrayList<>(); + for (int j = 0; j < 2000; j++) { + RFuture f2 = redisson.getBucket("" + j).setAsync(""); + futures.add(f2); + } + + System.out.println("master " + masters.get(0).getFirstMappedPort() + " has been stopped!"); - for (RFuture rFuture : futures) { try { + Thread.sleep(TimeUnit.SECONDS.toMillis(40)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + List newMasters = getMasterNodes(nodes); + newMasters.removeAll(masters); + assertThat(newMasters).hasSize(1); + + for (RFuture rFuture : futures) { rFuture.toCompletableFuture().join(); - } catch (Exception e) { - Assertions.fail(e.getMessage()); } - } - redisson.shutdown(); - process.shutdown(); + redisson.shutdown(); + }); } diff --git a/redisson/src/test/java/org/redisson/RedissonShardedTopicTest.java b/redisson/src/test/java/org/redisson/RedissonShardedTopicTest.java index b591132c8..c37a9e3b8 100644 --- a/redisson/src/test/java/org/redisson/RedissonShardedTopicTest.java +++ b/redisson/src/test/java/org/redisson/RedissonShardedTopicTest.java @@ -8,16 +8,10 @@ import org.redisson.api.RTopic; import org.redisson.api.RedissonClient; import org.redisson.api.listener.MessageListener; import org.redisson.api.listener.StatusListener; -import org.redisson.api.redisnode.RedisCluster; -import org.redisson.api.redisnode.RedisClusterMaster; -import org.redisson.api.redisnode.RedisNodes; -import org.redisson.client.RedisClient; -import org.redisson.client.RedisClientConfig; -import org.redisson.client.RedisConnection; import org.redisson.client.RedisException; -import org.redisson.client.protocol.RedisCommands; import org.redisson.config.Config; import org.redisson.config.SubscriptionMode; +import org.testcontainers.containers.ContainerState; import org.testcontainers.containers.GenericContainer; import java.time.Duration; @@ -50,7 +44,7 @@ public class RedissonShardedTopicTest extends RedisDockerTest { @Test public void testReattachInClusterMaster() { - withNewCluster(redissonClient -> { + withNewCluster((nodes, redissonClient) -> { Config cfg = redissonClient.getConfig(); cfg.useClusterServers() .setPingConnectionInterval(0) @@ -81,17 +75,13 @@ public class RedissonShardedTopicTest extends RedisDockerTest { assertThat(topic.countSubscribers()).isEqualTo(1); - RedisCluster rnc = redisson.getRedisNodes(RedisNodes.CLUSTER); - for (RedisClusterMaster master : rnc.getMasters()) { - RedisClientConfig cc = new RedisClientConfig(); - cc.setAddress("redis://" + master.getAddr().getHostString() + ":" + master.getAddr().getPort()); - RedisClient c = RedisClient.create(cc); - RedisConnection cn = c.connect(); - List channels = cn.sync(RedisCommands.PUBSUB_SHARDCHANNELS); - if (channels.contains("3")) { - cn.async(RedisCommands.SHUTDOWN); + List masters = getMasterNodes(nodes); + for (ContainerState master : masters) { + String r = execute(master, "redis-cli", "pubsub", "shardchannels"); + if (r.contains("3")) { + stop(master); + break; } - c.shutdown(); } Awaitility.waitAtMost(Duration.ofSeconds(30)).until(() -> subscriptions.get() == 2); diff --git a/redisson/src/test/java/org/redisson/RedissonTest.java b/redisson/src/test/java/org/redisson/RedissonTest.java index f249c45e4..ce7a57573 100644 --- a/redisson/src/test/java/org/redisson/RedissonTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTest.java @@ -6,7 +6,6 @@ import io.netty.util.CharsetUtil; import net.bytebuddy.utility.RandomString; import org.awaitility.Awaitility; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.Test; import org.redisson.api.*; import org.redisson.api.redisnode.RedisClusterMaster; @@ -32,7 +31,6 @@ import org.redisson.config.CredentialsResolver; import org.redisson.connection.CRC16; import org.redisson.connection.ConnectionListener; import org.redisson.connection.MasterSlaveConnectionManager; -import org.redisson.connection.balancer.RandomLoadBalancer; import org.redisson.misc.RedisURI; import org.testcontainers.containers.FixedHostPortGenericContainer; import org.testcontainers.containers.GenericContainer; @@ -673,7 +671,7 @@ public class RedissonTest extends RedisDockerTest { @Test public void testMovedRedirectInCluster() throws Exception { - withNewCluster(redissonClient -> { + withNewCluster((nodes, redissonClient) -> { Config config = redissonClient.getConfig(); config.useClusterServers() .setScanInterval(100000); diff --git a/redisson/src/test/java/org/redisson/RedissonTopicTest.java b/redisson/src/test/java/org/redisson/RedissonTopicTest.java index c6c611b2d..f9ccc55d3 100644 --- a/redisson/src/test/java/org/redisson/RedissonTopicTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTopicTest.java @@ -1,9 +1,5 @@ package org.redisson; -import com.github.dockerjava.api.command.InspectContainerResponse; -import com.github.dockerjava.api.model.ContainerNetwork; -import com.github.dockerjava.api.model.ExposedPort; -import com.github.dockerjava.api.model.Ports; import org.awaitility.Awaitility; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -26,12 +22,9 @@ import org.redisson.config.SubscriptionMode; import org.redisson.connection.balancer.RandomLoadBalancer; import org.redisson.misc.RedisURI; import org.testcontainers.containers.ContainerState; -import org.testcontainers.containers.DockerComposeContainer; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy; -import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; -import java.io.File; import java.io.Serializable; import java.time.Duration; import java.util.*; @@ -40,7 +33,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; @@ -405,7 +397,7 @@ public class RedissonTopicTest extends RedisDockerTest { @Test public void testSlotMigrationInCluster() { - withNewCluster(client -> { + withNewCluster((nodes, client) -> { Config config = client.getConfig(); config.useClusterServers() .setScanInterval(1000) @@ -1170,7 +1162,7 @@ public class RedissonTopicTest extends RedisDockerTest { @Test public void testReattachInClusterSlave() { - withNewCluster(client -> { + withNewCluster((nodes2, client) -> { Config config = client.getConfig(); config.useClusterServers() .setSubscriptionMode(SubscriptionMode.SLAVE); @@ -1203,13 +1195,9 @@ public class RedissonTopicTest extends RedisDockerTest { assertThat(subscriptions.get()).isEqualTo(1); - RedisCluster nodes = redisson.getRedisNodes(RedisNodes.CLUSTER); - for (RedisClusterSlave slave : nodes.getSlaves()) { - RedisClientConfig cc = new RedisClientConfig(); - cc.setAddress("redis://" + slave.getAddr().getHostString() + ":" + slave.getAddr().getPort()); - RedisClient c = RedisClient.create(cc); - c.connect().async(RedisCommands.SHUTDOWN); - c.shutdown(); + List slaves = getSlaveNodes(nodes2); + for (ContainerState slave : slaves) { + stop(slave); } await().atMost(25, TimeUnit.SECONDS).until(() -> subscriptions.get() == 2); @@ -1320,7 +1308,7 @@ public class RedissonTopicTest extends RedisDockerTest { @Test public void testReattachInClusterMaster2() { - withNewCluster(redisson -> { + withNewCluster((nodes, redisson) -> { Queue messages = new ConcurrentLinkedQueue<>(); Queue subscriptions = new ConcurrentLinkedQueue<>(); @@ -1343,14 +1331,8 @@ public class RedissonTopicTest extends RedisDockerTest { topic.addListener(String.class, (channel, msg) -> messages.add(msg)); } - RedisCluster rnc = redisson.getRedisNodes(RedisNodes.CLUSTER); - Optional f = rnc.getMasters().stream().findFirst(); - RedisClusterMaster master = f.get(); - RedisClientConfig cc = new RedisClientConfig(); - cc.setAddress("redis://" + master.getAddr().getHostString() + ":" + master.getAddr().getPort()); - RedisClient c = RedisClient.create(cc); - c.connect().async(RedisCommands.SHUTDOWN); - c.shutdown(); + List masters = getMasterNodes(nodes); + stop(masters.get(0)); Awaitility.waitAtMost(Duration.ofSeconds(40)).untilAsserted(() -> { assertThat(subscriptions).hasSizeGreaterThan(125); @@ -1372,7 +1354,7 @@ public class RedissonTopicTest extends RedisDockerTest { @Test public void testReattachInClusterMaster() { - withNewCluster(redissonClient -> { + withNewCluster((nodes, redissonClient) -> { Config cfg = redissonClient.getConfig(); cfg.useClusterServers().setSubscriptionMode(SubscriptionMode.MASTER); @@ -1401,17 +1383,13 @@ public class RedissonTopicTest extends RedisDockerTest { sendCommands(redisson, "3"); - RedisCluster rnc = redisson.getRedisNodes(RedisNodes.CLUSTER); - for (RedisClusterMaster master : rnc.getMasters()) { - RedisClientConfig cc = new RedisClientConfig(); - cc.setAddress("redis://" + master.getAddr().getHostString() + ":" + master.getAddr().getPort()); - RedisClient c = RedisClient.create(cc); - RedisConnection cn = c.connect(); - List channels = cn.sync(RedisCommands.PUBSUB_CHANNELS); - if (channels.contains("3")) { - cn.async(RedisCommands.SHUTDOWN); + List masters = getMasterNodes(nodes); + for (ContainerState master : masters) { + String r = execute(master, "redis-cli", "pubsub", "channels"); + if (r.contains("3")) { + stop(master); + break; } - c.shutdown(); } try { @@ -1431,7 +1409,7 @@ public class RedissonTopicTest extends RedisDockerTest { @Test public void testReattachPatternTopicListenersOnClusterFailover() { - withNewCluster(redisson -> { + withNewCluster((nodes2, redisson) -> { RedisCluster nodes = redisson.getRedisNodes(RedisNodes.CLUSTER); for (RedisClusterMaster master : nodes.getMasters()) { master.setConfig("notify-keyspace-events", "K$"); @@ -1467,21 +1445,13 @@ public class RedissonTopicTest extends RedisDockerTest { return messagesReceived.get() == 100; }); - RedisCluster rnc = redisson.getRedisNodes(RedisNodes.CLUSTER); - for (RedisClusterMaster master : rnc.getMasters()) { - RedisClientConfig cc = new RedisClientConfig(); - cc.setAddress("redis://" + master.getAddr().getHostString() + ":" + master.getAddr().getPort()); - RedisClient c = RedisClient.create(cc); - RedisConnection cn = c.connect(); - try { - Boolean res = cn.sync(RedisCommands.EXISTS, "i99"); - if (res) { - cn.async(RedisCommands.SHUTDOWN); - } - } catch (Exception e) { - // skip + List masters = getMasterNodes(nodes2); + for (ContainerState master : masters) { + String r = execute(master, "redis-cli", "exists", "i99"); + if (r.contains("1")) { + stop(master); + break; } - c.shutdown(); } await().atMost(30, TimeUnit.SECONDS).until(() -> {