From 69228326c3c087d5398c07a5e4d3a299314a23d7 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 23 Nov 2023 08:54:03 +0300 Subject: [PATCH] refactoring --- .../redisson/RedissonTopicPatternTest.java | 252 ++++++++---------- 1 file changed, 113 insertions(+), 139 deletions(-) diff --git a/redisson/src/test/java/org/redisson/RedissonTopicPatternTest.java b/redisson/src/test/java/org/redisson/RedissonTopicPatternTest.java index 0871fb71e..da89932c3 100644 --- a/redisson/src/test/java/org/redisson/RedissonTopicPatternTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTopicPatternTest.java @@ -1,9 +1,10 @@ package org.redisson; +import com.github.dockerjava.api.model.PortBinding; +import com.github.dockerjava.api.model.Ports; import org.awaitility.Awaitility; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.redisson.RedisRunner.RedisProcess; import org.redisson.api.RBucket; import org.redisson.api.RPatternTopic; import org.redisson.api.RTopic; @@ -11,12 +12,16 @@ import org.redisson.api.RedissonClient; import org.redisson.api.listener.BasePatternStatusListener; import org.redisson.api.listener.PatternMessageListener; import org.redisson.api.listener.PatternStatusListener; +import org.redisson.api.redisnode.RedisCluster; +import org.redisson.api.redisnode.RedisClusterMaster; +import org.redisson.api.redisnode.RedisClusterSlave; +import org.redisson.api.redisnode.RedisNodes; import org.redisson.client.codec.StringCodec; import org.redisson.config.Config; import org.redisson.config.SubscriptionMode; import org.redisson.connection.balancer.RandomLoadBalancer; +import org.testcontainers.containers.GenericContainer; -import java.io.IOException; import java.io.Serializable; import java.time.Duration; import java.util.ArrayList; @@ -28,7 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; -public class RedissonTopicPatternTest extends BaseTest { +public class RedissonTopicPatternTest extends RedisDockerTest { public static class Message implements Serializable { @@ -69,127 +74,95 @@ public class RedissonTopicPatternTest extends BaseTest { } @Test - public void testCluster() throws IOException, InterruptedException { - RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave().notifyKeyspaceEvents( - RedisRunner.KEYSPACE_EVENTS_OPTIONS.E, - RedisRunner.KEYSPACE_EVENTS_OPTIONS.g); - RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave().notifyKeyspaceEvents( - RedisRunner.KEYSPACE_EVENTS_OPTIONS.E, - RedisRunner.KEYSPACE_EVENTS_OPTIONS.g); - RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave().notifyKeyspaceEvents( - RedisRunner.KEYSPACE_EVENTS_OPTIONS.E, - RedisRunner.KEYSPACE_EVENTS_OPTIONS.g); - RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave().notifyKeyspaceEvents( - RedisRunner.KEYSPACE_EVENTS_OPTIONS.E, - RedisRunner.KEYSPACE_EVENTS_OPTIONS.g); - RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave().notifyKeyspaceEvents( - RedisRunner.KEYSPACE_EVENTS_OPTIONS.E, - RedisRunner.KEYSPACE_EVENTS_OPTIONS.g); - RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave().notifyKeyspaceEvents( - RedisRunner.KEYSPACE_EVENTS_OPTIONS.E, - RedisRunner.KEYSPACE_EVENTS_OPTIONS.g); - - ClusterRunner clusterRunner = new ClusterRunner() - .addNode(master1, slave1) - .addNode(master2, slave2) - .addNode(master3, slave3); - ClusterRunner.ClusterProcesses process = clusterRunner.run(); + public void testCluster() { + testInCluster(redisson -> { + RedisCluster nodes = redisson.getRedisNodes(RedisNodes.CLUSTER); + for (RedisClusterSlave slave : nodes.getSlaves()) { + slave.setConfig("notify-keyspace-events", "Eg"); + } + for (RedisClusterMaster master : nodes.getMasters()) { + master.setConfig("notify-keyspace-events", "Eg"); + } - Thread.sleep(3000); + AtomicInteger subscribeCounter = new AtomicInteger(); + RPatternTopic topic = redisson.getPatternTopic("__keyevent@*", StringCodec.INSTANCE); + topic.addListener(new PatternStatusListener() { + @Override + public void onPSubscribe(String pattern) { + subscribeCounter.incrementAndGet(); + } - Config config = new Config(); - config.useClusterServers() - .setPingConnectionInterval(0) - .setLoadBalancer(new RandomLoadBalancer()) - .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); - RedissonClient redisson = Redisson.create(config); + @Override + public void onPUnsubscribe(String pattern) { + System.out.println("onPUnsubscribe: " + pattern); + } + }); - AtomicInteger subscribeCounter = new AtomicInteger(); - RPatternTopic topic = redisson.getPatternTopic("__keyevent@*", StringCodec.INSTANCE); - topic.addListener(new PatternStatusListener() { - @Override - public void onPSubscribe(String pattern) { - subscribeCounter.incrementAndGet(); - } + AtomicInteger counter = new AtomicInteger(); - @Override - public void onPUnsubscribe(String pattern) { - System.out.println("onPUnsubscribe: " + pattern); + PatternMessageListener listener = (pattern, channel, msg) -> { + System.out.println("mes " + channel + " counter " + counter.get()); + counter.incrementAndGet(); + }; + topic.addListener(String.class, listener); + + for (int i = 0; i < 10; i++) { + redisson.getBucket("" + i).set(i); + redisson.getBucket("" + i).delete(); + try { + Thread.sleep(7); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } - }); - - AtomicInteger counter = new AtomicInteger(); - PatternMessageListener listener = (pattern, channel, msg) -> { - System.out.println("mes " + channel + " counter " + counter.get()); - counter.incrementAndGet(); - }; - topic.addListener(String.class, listener); + Awaitility.await().atMost(Duration.ofSeconds(2)).until(() -> counter.get() > 9); + assertThat(subscribeCounter.get()).isEqualTo(1); - for (int i = 0; i < 10; i++) { - redisson.getBucket("" + i).set(i); - redisson.getBucket("" + i).delete(); - Thread.sleep(7); - } - - Awaitility.await().atMost(Duration.ofSeconds(2)).until(() -> counter.get() > 9); - assertThat(subscribeCounter.get()).isEqualTo(1); + for (RedisClusterSlave slave : nodes.getSlaves()) { + slave.setConfig("notify-keyspace-events", ""); + } + for (RedisClusterMaster master : nodes.getMasters()) { + master.setConfig("notify-keyspace-events", ""); + } - redisson.shutdown(); - process.shutdown(); + topic.removeAllListeners(); + }); } @Test - public void testNonEventMessagesInCluster() throws IOException, InterruptedException { - RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave(); + public void testNonEventMessagesInCluster() { + testInCluster(redisson -> { + AtomicInteger subscribeCounter = new AtomicInteger(); + RPatternTopic topic = redisson.getPatternTopic("my*", StringCodec.INSTANCE); + topic.addListener(new PatternStatusListener() { + @Override + public void onPSubscribe(String pattern) { + subscribeCounter.incrementAndGet(); + } - ClusterRunner clusterRunner = new ClusterRunner() - .addNode(master1, slave1) - .addNode(master2, slave2) - .addNode(master3, slave3); - ClusterRunner.ClusterProcesses process = clusterRunner.run(); + @Override + public void onPUnsubscribe(String pattern) { + System.out.println("onPUnsubscribe: " + pattern); + } + }); - Config config = new Config(); - config.useClusterServers() - .setLoadBalancer(new RandomLoadBalancer()) - .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); - RedissonClient redisson = Redisson.create(config); + AtomicInteger counter = new AtomicInteger(); - AtomicInteger subscribeCounter = new AtomicInteger(); - RPatternTopic topic = redisson.getPatternTopic("my*", StringCodec.INSTANCE); - topic.addListener(new PatternStatusListener() { - @Override - public void onPSubscribe(String pattern) { - subscribeCounter.incrementAndGet(); - } + PatternMessageListener listener = (pattern, channel, msg) -> { + counter.incrementAndGet(); + }; + topic.addListener(String.class, listener); - @Override - public void onPUnsubscribe(String pattern) { - System.out.println("onPUnsubscribe: " + pattern); + for (int i = 0; i < 100; i++) { + redisson.getTopic("my" + i).publish(123); } - }); - - AtomicInteger counter = new AtomicInteger(); - - PatternMessageListener listener = (pattern, channel, msg) -> { - counter.incrementAndGet(); - }; - topic.addListener(String.class, listener); - - for (int i = 0; i < 100; i++) { - redisson.getTopic("my" + i).publish(123); - } - Awaitility.await().atMost(Duration.ofSeconds(2)).until(() -> counter.get() == 100); - assertThat(subscribeCounter.get()).isEqualTo(1); + Awaitility.await().atMost(Duration.ofSeconds(2)).until(() -> counter.get() == 100); + assertThat(subscribeCounter.get()).isEqualTo(1); - redisson.shutdown(); - process.shutdown(); + topic.removeAllListeners(); + }); } @Test @@ -211,6 +184,7 @@ public class RedissonTopicPatternTest extends BaseTest { Assertions.assertEquals(1, i.get()); Assertions.assertEquals(1, str.get()); + topic1.removeAllListeners(); } @Test @@ -232,14 +206,14 @@ public class RedissonTopicPatternTest extends BaseTest { redisson.getTopic("topic1.t3").publish(new Message("123")); Assertions.assertTrue(messageRecieved.await(5, TimeUnit.SECONDS)); + topic1.removeAllListeners(); } @Test public void testLazyUnsubscribe() throws InterruptedException { final CountDownLatch messageRecieved = new CountDownLatch(1); - RedissonClient redisson1 = BaseTest.createInstance(); - RPatternTopic topic1 = redisson1.getPatternTopic("topic.*"); + RPatternTopic topic1 = redisson.getPatternTopic("topic.*"); int listenerId = topic1.addListener(Message.class, (pattern, channel, msg) -> { Assertions.fail(); }); @@ -248,7 +222,7 @@ public class RedissonTopicPatternTest extends BaseTest { topic1.removeListener(listenerId); Thread.sleep(1000); - RedissonClient redisson2 = BaseTest.createInstance(); + RedissonClient redisson2 = createInstance(); RPatternTopic topic2 = redisson2.getPatternTopic("topic.*"); topic2.addListener(Message.class, (pattern, channel, msg) -> { Assertions.assertTrue(pattern.equals("topic.*")); @@ -262,7 +236,7 @@ public class RedissonTopicPatternTest extends BaseTest { Assertions.assertTrue(messageRecieved.await(5, TimeUnit.SECONDS)); - redisson1.shutdown(); + topic1.removeAllListeners(); redisson2.shutdown(); } @@ -271,8 +245,7 @@ public class RedissonTopicPatternTest extends BaseTest { final CountDownLatch messageRecieved = new CountDownLatch(5); final CountDownLatch statusRecieved = new CountDownLatch(1); - RedissonClient redisson1 = BaseTest.createInstance(); - RPatternTopic topic1 = redisson1.getPatternTopic("topic.*"); + RPatternTopic topic1 = redisson.getPatternTopic("topic.*"); topic1.addListener(new BasePatternStatusListener() { @Override public void onPSubscribe(String pattern) { @@ -285,7 +258,7 @@ public class RedissonTopicPatternTest extends BaseTest { messageRecieved.countDown(); }); - RedissonClient redisson2 = BaseTest.createInstance(); + RedissonClient redisson2 = createInstance(); RTopic topic2 = redisson2.getTopic("topic.t1"); topic2.addListener(Message.class, (channel, msg) -> { Assertions.assertEquals(new Message("123"), msg); @@ -305,14 +278,13 @@ public class RedissonTopicPatternTest extends BaseTest { statusRecieved.await(); Assertions.assertTrue(messageRecieved.await(5, TimeUnit.SECONDS)); - redisson1.shutdown(); + topic1.removeAllListeners(); redisson2.shutdown(); } @Test - public void testListenerRemove() throws InterruptedException { - RedissonClient redisson1 = BaseTest.createInstance(); - RPatternTopic topic1 = redisson1.getPatternTopic("topic.*"); + public void testListenerRemove() { + RPatternTopic topic1 = redisson.getPatternTopic("topic.*"); final CountDownLatch l = new CountDownLatch(1); topic1.addListener(new BasePatternStatusListener() { @Override @@ -325,19 +297,19 @@ public class RedissonTopicPatternTest extends BaseTest { Assertions.fail(); }); - RedissonClient redisson2 = BaseTest.createInstance(); + RedissonClient redisson2 = createInstance(); RTopic topic2 = redisson2.getTopic("topic.t1"); topic1.removeListener(id); topic2.publish(new Message("123")); - redisson1.shutdown(); + topic1.removeAllListeners(); redisson2.shutdown(); } @Test public void testConcurrentTopic() throws Exception { - int threads = 30; - int loops = 50000; + int threads = 16; + int loops = 25000; ExecutorService executor = Executors.newFixedThreadPool(threads); List> futures = new ArrayList<>(); @@ -372,6 +344,9 @@ public class RedissonTopicPatternTest extends BaseTest { for (Future future : futures) { future.get(); } + + RPatternTopic t = redisson.getPatternTopic("PUBSUB*"); + t.removeAllListeners(); } @Test @@ -462,19 +437,23 @@ public class RedissonTopicPatternTest extends BaseTest { assertThat(executions.get()).isEqualTo(100); redisson.shutdown(); - process.shutdown(); } @Test - public void testReattach() throws InterruptedException, IOException, ExecutionException, TimeoutException { - RedisProcess runner = new RedisRunner() - .nosave() - .randomDir() - .randomPort() - .run(); - + public void testReattach() throws InterruptedException { + GenericContainer redis = + new GenericContainer<>("redis:7.2") + .withExposedPorts(6379); + redis.start(); + int port = redis.getFirstMappedPort(); + redis.withCreateContainerCmdModifier(cmd -> { + cmd.getHostConfig().withPortBindings( + new PortBinding(Ports.Binding.bindPort(Integer.valueOf(port)), + cmd.getExposedPorts()[0])); + }); + Config config = new Config(); - config.useSingleServer().setAddress(runner.getRedisServerAddressAndPort()); + config.useSingleServer().setAddress("redis://127.0.0.1:" + redis.getFirstMappedPort()); RedissonClient redisson = Redisson.create(config); final AtomicBoolean executed = new AtomicBoolean(); @@ -488,15 +467,10 @@ public class RedissonTopicPatternTest extends BaseTest { } } }); - - runner.stop(); - runner = new RedisRunner() - .port(runner.getRedisServerPort()) - .nosave() - .randomDir() - .run(); - + redis.stop(); + redis.start(); + Thread.sleep(1000); redisson.getTopic("topic1").publish(1); @@ -504,7 +478,7 @@ public class RedissonTopicPatternTest extends BaseTest { await().atMost(5, TimeUnit.SECONDS).untilTrue(executed); redisson.shutdown(); - runner.stop(); + redis.stop(); } }