From 77ddce68a2eedd0169b696965e7c22fe0bcc3c83 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 23 Nov 2023 09:30:00 +0300 Subject: [PATCH] refactoring --- .../java/org/redisson/RedissonTopicTest.java | 342 ++++++++++-------- 1 file changed, 199 insertions(+), 143 deletions(-) diff --git a/redisson/src/test/java/org/redisson/RedissonTopicTest.java b/redisson/src/test/java/org/redisson/RedissonTopicTest.java index 88904258e..505051099 100644 --- a/redisson/src/test/java/org/redisson/RedissonTopicTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTopicTest.java @@ -1,12 +1,13 @@ package org.redisson; -import com.github.dockerjava.api.DockerClient; +import com.github.dockerjava.api.command.InspectContainerResponse; import com.github.dockerjava.api.model.ContainerNetwork; import com.github.dockerjava.api.model.ExposedPort; import com.github.dockerjava.api.model.PortBinding; import com.github.dockerjava.api.model.Ports; import org.awaitility.Awaitility; -import org.junit.jupiter.api.*; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; import org.redisson.ClusterRunner.ClusterProcesses; import org.redisson.RedisRunner.KEYSPACE_EVENTS_OPTIONS; import org.redisson.RedisRunner.RedisProcess; @@ -24,14 +25,15 @@ import org.redisson.client.protocol.RedisStrictCommand; import org.redisson.cluster.ClusterNodeInfo; import org.redisson.config.Config; import org.redisson.config.SubscriptionMode; -import org.redisson.connection.SequentialDnsAddressResolverFactory; import org.redisson.connection.balancer.RandomLoadBalancer; import org.redisson.misc.RedisURI; -import org.testcontainers.DockerClientFactory; +import org.testcontainers.containers.ContainerState; +import org.testcontainers.containers.DockerComposeContainer; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.Network; import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy; +import java.io.File; import java.io.IOException; import java.io.Serializable; import java.time.Duration; @@ -1000,9 +1002,6 @@ public class RedissonTopicTest extends RedisDockerTest { Thread.sleep(5000); Config config = new Config(); - config.setAddressResolverGroupFactory(new SequentialDnsAddressResolverFactory() { - - }); config.useSentinelServers() .setNatMapper(new NatMapper() { @@ -1191,7 +1190,6 @@ public class RedissonTopicTest extends RedisDockerTest { @Override public void onSubscribe(String channel) { - System.out.println("onSubscribe " + (System.currentTimeMillis() - t)); subscriptions.incrementAndGet(); } }); @@ -1313,72 +1311,63 @@ public class RedissonTopicTest extends RedisDockerTest { } @Test - public void testReattachInClusterSlave() throws Exception { - GenericContainer redisCluster = new GenericContainer<>("vishnunair/docker-redis-cluster") - .withExposedPorts(6379, 6380, 6381, 6382, 6383, 6384) - .withStartupCheckStrategy(new MinimumDurationRunningStartupCheckStrategy(Duration.ofSeconds(10))); - redisCluster.start(); + public void testReattachInClusterSlave() { + withCluster(client -> { + Config config = client.getConfig(); + config.useClusterServers() + .setSubscriptionMode(SubscriptionMode.SLAVE); + RedissonClient redisson = Redisson.create(config); - Config config = new Config(); - config.useClusterServers() - .setNatMapper(new NatMapper() { - @Override - public RedisURI map(RedisURI uri) { - if (redisCluster.getMappedPort(uri.getPort()) == null) { - return uri; - } - return new RedisURI(uri.getScheme(), redisCluster.getHost(), redisCluster.getMappedPort(uri.getPort())); - } - }) - .setSubscriptionMode(SubscriptionMode.SLAVE) - .addNodeAddress("redis://127.0.0.1:" + redisCluster.getFirstMappedPort()); - RedissonClient redisson = Redisson.create(config); - - final AtomicBoolean executed = new AtomicBoolean(); - final AtomicInteger subscriptions = new AtomicInteger(); - - RTopic topic = redisson.getTopic("topic"); - topic.addListener(new StatusListener() { - - @Override - public void onUnsubscribe(String channel) { - } - - @Override - public void onSubscribe(String channel) { - subscriptions.incrementAndGet(); - } - }); - topic.addListener(Integer.class, new MessageListener() { - @Override - public void onMessage(CharSequence channel, Integer msg) { - executed.set(true); + final AtomicBoolean executed = new AtomicBoolean(); + final AtomicInteger subscriptions = new AtomicInteger(); + + RTopic topic = redisson.getTopic("topic"); + topic.addListener(new StatusListener() { + + @Override + public void onUnsubscribe(String channel) { + } + + @Override + public void onSubscribe(String channel) { + subscriptions.incrementAndGet(); + } + }); + topic.addListener(Integer.class, new MessageListener() { + @Override + public void onMessage(CharSequence channel, Integer msg) { + executed.set(true); + } + }); + assertThat(topic.countListeners()).isEqualTo(2); + + sendCommands(redisson, "topic"); + + assertThat(subscriptions.get()).isEqualTo(1); + + RedisCluster nodes = redisson.getRedisNodes(RedisNodes.CLUSTER); + for (RedisClusterSlave slave : nodes.getSlaves()) { + RedisClientConfig cc = new RedisClientConfig(); + cc.setAddress("redis://" + slave.getAddr().getHostString() + ":" + slave.getAddr().getPort()); + RedisClient c = RedisClient.create(cc); + c.connect().async(RedisCommands.SHUTDOWN); + c.shutdown(); } - }); - assertThat(topic.countListeners()).isEqualTo(2); - - sendCommands(redisson, "topic"); - RedisCluster nodes = redisson.getRedisNodes(RedisNodes.CLUSTER); - for (RedisClusterSlave slave : nodes.getSlaves()) { - RedisClientConfig cc = new RedisClientConfig(); - cc.setAddress("redis://" + slave.getAddr().getHostString() + ":" + slave.getAddr().getPort()); - RedisClient c = RedisClient.create(cc); - c.connect().async(RedisCommands.SHUTDOWN); - c.shutdown(); - Thread.sleep(18000); - } + await().atMost(25, TimeUnit.SECONDS).until(() -> subscriptions.get() == 2); - Thread.sleep(15000); + executed.set(false); + redisson.getTopic("topic").publish(1); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + assertThat(executed.get()).isTrue(); + assertThat(topic.countListeners()).isEqualTo(2); - redisson.getTopic("topic").publish(1); - - await().atMost(75, TimeUnit.SECONDS).until(() -> subscriptions.get() == 2); - assertThat(topic.countListeners()).isEqualTo(2); - assertThat(executed.get()).isTrue(); - - redisson.shutdown(); - redisCluster.stop(); + redisson.shutdown(); + }); } @Test @@ -1464,57 +1453,59 @@ public class RedissonTopicTest extends RedisDockerTest { @Test public void testReattachInClusterMaster2() throws Exception { - withCluster(redisson -> { - Queue messages = new ConcurrentLinkedQueue<>(); - Queue subscriptions = new ConcurrentLinkedQueue<>(); + RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave(); + RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave(); + RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave(); + RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave(); + RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave(); + RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave(); + + ClusterRunner clusterRunner = new ClusterRunner() + .addNode(master1, slave1) + .addNode(master2, slave2) + .addNode(master3, slave3); + ClusterProcesses process = clusterRunner.run(); - int topicsAmount = 100; - for (int i = 0; i < topicsAmount; i++) { - RTopic topic = redisson.getTopic("topic" + i); - int finalI = i; - topic.addListener(new StatusListener() { + Config config = new Config(); + config.useClusterServers() + .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); + RedissonClient redisson = Redisson.create(config); - @Override - public void onUnsubscribe(String channel) { - } + Queue messages = new ConcurrentLinkedQueue<>(); + Queue subscriptions = new ConcurrentLinkedQueue<>(); - @Override - public void onSubscribe(String channel) { - subscriptions.add("topic" + finalI); - } - }); - topic.addListener(String.class, (channel, msg) -> messages.add(msg)); - } + int topicsAmount = 100; + for (int i = 0; i < topicsAmount; i++) { + RTopic topic = redisson.getTopic("topic" + i); + int finalI = i; + topic.addListener(new StatusListener() { - RedisCluster nodes = redisson.getRedisNodes(RedisNodes.CLUSTER); - nodes.getMasters().forEach(m -> { - RedisClientConfig cc = new RedisClientConfig(); - cc.setAddress("redis://" + m.getAddr().getHostString() + ":" + m.getAddr().getPort()); - RedisClient c = RedisClient.create(cc); - c.connect().async(RedisCommands.SHUTDOWN); - c.shutdown(); + @Override + public void onUnsubscribe(String channel) { + } + + @Override + public void onSubscribe(String channel) { + subscriptions.add("topic" + finalI); + } }); + topic.addListener(String.class, (channel, msg) -> messages.add(msg)); + } - try { - Thread.sleep(TimeUnit.SECONDS.toMillis(30)); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + RedisRunner.RedisProcess master = process.getNodes().stream().filter(x -> x.getRedisServerPort() == master1.getPort()).findFirst().get(); + master.stop(); - assertThat(subscriptions).hasSize(topicsAmount*2); + Thread.sleep(TimeUnit.SECONDS.toMillis(40)); - for (int i = 0; i < topicsAmount; i++) { - RTopic topic = redisson.getTopic("topic" + i); - topic.publish("topic" + i); - } + assertThat(subscriptions).hasSize(140); - try { - Thread.sleep(100); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - assertThat(messages).hasSize(topicsAmount); - }); + for (int i = 0; i < topicsAmount; i++) { + RTopic topic = redisson.getTopic("topic" + i); + topic.publish("topic" + i); + } + + Thread.sleep(100); + assertThat(messages).hasSize(topicsAmount); } public void withCluster(Consumer callback) { @@ -1542,38 +1533,106 @@ public class RedissonTopicTest extends RedisDockerTest { redisCluster.stop(); } - @Test - public void testReattachInClusterMaster() throws Exception { - GenericContainer redisCluster = new GenericContainer<>("vishnunair/docker-redis-cluster") - .withExposedPorts(6379, 6380, 6381, 6382, 6383, 6384) - .withStartupCheckStrategy(new MinimumDurationRunningStartupCheckStrategy(Duration.ofSeconds(10))); - redisCluster.start(); + public void withCluster2(Consumer callback) { + List nodes = new ArrayList<>(); + + DockerComposeContainer environment = + new DockerComposeContainer(new File("src/test/resources/docker-compose.yml")) + .withExposedService("redis-node-0", 6379) + .withExposedService("redis-node-1", 6379) + .withExposedService("redis-node-2", 6379) + .withExposedService("redis-node-3", 6379) + .withExposedService("redis-node-4", 6379) + .withExposedService("redis-node-5", 6379) + .withExposedService("redis-node-6", 6379) + .withExposedService("redis-node-7", 6379); + + environment.start(); + + try { + Thread.sleep(25000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + for (int i = 0; i < 8; i++) { + Optional cc = environment.getContainerByServiceName("redis-node-" + i); + nodes.add(cc.get().getContainerInfo()); + } + + Optional cc2 = environment.getContainerByServiceName("redis-node-0"); Config config = new Config(); config.useClusterServers() - .setSubscriptionMode(SubscriptionMode.MASTER) .setNatMapper(new NatMapper() { + @Override public RedisURI map(RedisURI uri) { - if (redisCluster.getMappedPort(uri.getPort()) == null) { - return uri; + for (InspectContainerResponse node : nodes) { + Ports.Binding[] mappedPort = node.getNetworkSettings() + .getPorts().getBindings().get(new ExposedPort(uri.getPort())); + + Map ss = node.getNetworkSettings().getNetworks(); + ContainerNetwork s = ss.values().iterator().next(); + + if (mappedPort != null + && s.getIpAddress().equals(uri.getHost())) { + return new RedisURI(uri.getScheme(), "127.0.0.1", Integer.valueOf(mappedPort[0].getHostPortSpec())); + } } - return new RedisURI(uri.getScheme(), redisCluster.getHost(), redisCluster.getMappedPort(uri.getPort())); + return uri; } }) - .addNodeAddress("redis://127.0.0.1:" + redisCluster.getFirstMappedPort()); + .addNodeAddress("redis://127.0.0.1:" + cc2.get().getFirstMappedPort()); + RedissonClient redisson = Redisson.create(config); - + + RedisCluster nodes2 = redisson.getRedisNodes(RedisNodes.CLUSTER); + for (RedisClusterSlave slave : nodes2.getSlaves()) { + slave.setConfig("cluster-node-timeout", "1000"); + } + for (RedisClusterMaster master : nodes2.getMasters()) { + master.setConfig("cluster-node-timeout", "1000"); + } + + callback.accept(redisson); + redisson.shutdown(); + environment.stop(); + } + + @Test + public void testReattachInClusterMaster() throws Exception { + RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave(); + + + ClusterRunner clusterRunner = new ClusterRunner() + .addNode(master1, slave1) + .addNode(master2, slave2) + .addNode(master3, slave3); + ClusterProcesses process = clusterRunner.run(); + + Config config = new Config(); + config.useClusterServers() + .setSubscriptionMode(SubscriptionMode.MASTER) + .setLoadBalancer(new RandomLoadBalancer()) + .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); + RedissonClient redisson = Redisson.create(config); + final AtomicBoolean executed = new AtomicBoolean(); final AtomicInteger subscriptions = new AtomicInteger(); - + RTopic topic = redisson.getTopic("3"); topic.addListener(new StatusListener() { - + @Override public void onUnsubscribe(String channel) { } - + @Override public void onSubscribe(String channel) { subscriptions.incrementAndGet(); @@ -1585,32 +1644,29 @@ public class RedissonTopicTest extends RedisDockerTest { executed.set(true); } }); - + sendCommands(redisson, "3"); - RedisCluster nodes = redisson.getRedisNodes(RedisNodes.CLUSTER); - nodes.getMasters().forEach(m -> { - RedisClientConfig cc = new RedisClientConfig(); - cc.setAddress("redis://" + m.getAddr().getHostString() + ":" + m.getAddr().getPort()); - RedisClient c = RedisClient.create(cc); - c.connect().async(RedisCommands.SHUTDOWN); - c.shutdown(); - try { - Thread.sleep(18000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }); + process.getNodes().stream().filter(x -> master1.getPort() == x.getRedisServerPort()) + .forEach(x -> { + try { + x.stop(); + Thread.sleep(18000); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + }); Thread.sleep(25000); redisson.getTopic("3").publish(1); - + await().atMost(75, TimeUnit.SECONDS).until(() -> subscriptions.get() == 2); assertThat(executed.get()).isTrue(); - + redisson.shutdown(); - redisCluster.stop(); + process.shutdown(); } @Test