From 2c1ab3df94c6db87a91b3177583c319655a9f9e7 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Tue, 21 Nov 2023 11:22:12 +0300 Subject: [PATCH] refactoring --- .../org/redisson/RedissonBucketsTest.java | 182 ++- .../RedissonCollectionMapReduceTest.java | 4 +- .../redisson/RedissonCountDownLatchTest.java | 2 +- .../java/org/redisson/RedissonTopicTest.java | 1086 +++++++++-------- .../redisson/RedissonTransferQueueTest.java | 15 +- .../java/org/redisson/jcache/JCacheTest.java | 210 +--- 6 files changed, 713 insertions(+), 786 deletions(-) diff --git a/redisson/src/test/java/org/redisson/RedissonBucketsTest.java b/redisson/src/test/java/org/redisson/RedissonBucketsTest.java index 9129f5453..2884a1484 100644 --- a/redisson/src/test/java/org/redisson/RedissonBucketsTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBucketsTest.java @@ -1,123 +1,95 @@ package org.redisson; -import static org.assertj.core.api.Assertions.assertThat; - -import java.io.IOException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - import org.junit.jupiter.api.Test; -import org.redisson.ClusterRunner.ClusterProcesses; -import org.redisson.RedisRunner.FailedToStartRedisException; import org.redisson.api.NameMapper; import org.redisson.api.RBucket; import org.redisson.api.RBuckets; import org.redisson.api.RedissonClient; import org.redisson.client.codec.StringCodec; import org.redisson.config.Config; -import org.redisson.connection.balancer.RandomLoadBalancer; +import org.redisson.config.ReadMode; -public class RedissonBucketsTest extends BaseTest { +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.assertj.core.api.Assertions.assertThat; + +public class RedissonBucketsTest extends RedisDockerTest { @Test - public void testGetInClusterNameMapper() throws FailedToStartRedisException, IOException, InterruptedException { - RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave(); - - ClusterRunner clusterRunner = new ClusterRunner() - .addNode(master1, slave1) - .addNode(master2, slave2) - .addNode(master3, slave3); - ClusterProcesses process = clusterRunner.run(); - - Config config = new Config(); - config.useClusterServers() - .setNameMapper(new NameMapper() { - @Override - public String map(String name) { - return "test::" + name; - } - - @Override - public String unmap(String name) { - return name.replace("test::", ""); - } - }) - .setLoadBalancer(new RandomLoadBalancer()) - .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); - RedissonClient redisson = Redisson.create(config); - - int size = 10000; - Map map = new HashMap<>(); - for (int i = 0; i < 10; i++) { - map.put("test" + i, i); - } - for (int i = 10; i < size; i++) { - map.put("test" + i + "{" + (i%100)+ "}", i); - } - - redisson.getBuckets().set(map); - - Set queryKeys = new HashSet<>(map.keySet()); - queryKeys.add("test_invalid"); - Map buckets = redisson.getBuckets().get(queryKeys.toArray(new String[map.size()])); - - assertThat(buckets).isEqualTo(map); - - for (int i = 0; i < 10; i++) { - assertThat(redisson.getBucket("test" + i).get()).isEqualTo(i); - } - - redisson.shutdown(); - process.shutdown(); + public void testGetInClusterNameMapper() { + testInCluster(client -> { + Config config = client.getConfig(); + config.useClusterServers() + .setReadMode(ReadMode.MASTER) + .setNameMapper(new NameMapper() { + @Override + public String map(String name) { + return "test::" + name; + } + + @Override + public String unmap(String name) { + return name.replace("test::", ""); + } + }); + + RedissonClient redisson = Redisson.create(config); + + int size = 10000; + Map map = new HashMap<>(); + for (int i = 0; i < 10; i++) { + map.put("test" + i, i); + } + for (int i = 10; i < size; i++) { + map.put("test" + i + "{" + (i % 100) + "}", i); + } + + redisson.getBuckets().set(map); + + Set queryKeys = new HashSet<>(map.keySet()); + queryKeys.add("test_invalid"); + Map buckets = redisson.getBuckets().get(queryKeys.toArray(new String[map.size()])); + + assertThat(buckets).isEqualTo(map); + + for (int i = 0; i < 10; i++) { + assertThat(redisson.getBucket("test" + i).get()).isEqualTo(i); + } + + redisson.shutdown(); + }); } @Test - public void testGetInCluster() throws FailedToStartRedisException, IOException, InterruptedException { - RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave(); - - ClusterRunner clusterRunner = new ClusterRunner() - .addNode(master1, slave1) - .addNode(master2, slave2) - .addNode(master3, slave3); - ClusterProcesses process = clusterRunner.run(); - - Config config = new Config(); - config.useClusterServers() - .setLoadBalancer(new RandomLoadBalancer()) - .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); - RedissonClient redisson = Redisson.create(config); - - int size = 10000; - Map map = new HashMap<>(); - for (int i = 0; i < 10; i++) { - map.put("test" + i, i); - } - for (int i = 10; i < size; i++) { - map.put("test" + i + "{" + (i%100)+ "}", i); - } - - redisson.getBuckets().set(map); - - Set queryKeys = new HashSet<>(map.keySet()); - queryKeys.add("test_invalid"); - Map buckets = redisson.getBuckets().get(queryKeys.toArray(new String[map.size()])); - - assertThat(buckets).isEqualTo(map); - - redisson.shutdown(); - process.shutdown(); + public void testGetInCluster() { + testInCluster(client -> { + Config config = client.getConfig(); + config.useClusterServers() + .setReadMode(ReadMode.MASTER); + RedissonClient redisson = Redisson.create(config); + + int size = 10000; + Map map = new HashMap<>(); + for (int i = 0; i < 10; i++) { + map.put("test" + i, i); + } + for (int i = 10; i < size; i++) { + map.put("test" + i + "{" + (i%100)+ "}", i); + } + + redisson.getBuckets().set(map); + + Set queryKeys = new HashSet<>(map.keySet()); + queryKeys.add("test_invalid"); + Map buckets = redisson.getBuckets().get(queryKeys.toArray(new String[map.size()])); + + assertThat(buckets).isEqualTo(map); + + redisson.shutdown(); + }); } @Test diff --git a/redisson/src/test/java/org/redisson/RedissonCollectionMapReduceTest.java b/redisson/src/test/java/org/redisson/RedissonCollectionMapReduceTest.java index d8b9dd799..0749e3fb8 100644 --- a/redisson/src/test/java/org/redisson/RedissonCollectionMapReduceTest.java +++ b/redisson/src/test/java/org/redisson/RedissonCollectionMapReduceTest.java @@ -14,7 +14,7 @@ import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; -public class RedissonCollectionMapReduceTest extends BaseTest { +public class RedissonCollectionMapReduceTest extends RedisDockerTest { public static class WordMapper implements RCollectionMapper { @@ -208,7 +208,7 @@ public class RedissonCollectionMapReduceTest extends BaseTest { if (RList.class.isAssignableFrom(mapClass)) { list = redisson.getList("list"); } else if (RQueue.class.isAssignableFrom(mapClass)) { - list = (RList) redisson.getQueue("queue"); + list = redisson.getList("queue"); } return list; } diff --git a/redisson/src/test/java/org/redisson/RedissonCountDownLatchTest.java b/redisson/src/test/java/org/redisson/RedissonCountDownLatchTest.java index 526ff7123..59957e3e6 100644 --- a/redisson/src/test/java/org/redisson/RedissonCountDownLatchTest.java +++ b/redisson/src/test/java/org/redisson/RedissonCountDownLatchTest.java @@ -14,7 +14,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; -public class RedissonCountDownLatchTest extends BaseTest { +public class RedissonCountDownLatchTest extends RedisDockerTest { @Test public void testAwaitTimeout() throws InterruptedException { diff --git a/redisson/src/test/java/org/redisson/RedissonTopicTest.java b/redisson/src/test/java/org/redisson/RedissonTopicTest.java index 669dc1fdd..bf74056ff 100644 --- a/redisson/src/test/java/org/redisson/RedissonTopicTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTopicTest.java @@ -1,5 +1,10 @@ package org.redisson; +import com.github.dockerjava.api.DockerClient; +import com.github.dockerjava.api.model.ContainerNetwork; +import com.github.dockerjava.api.model.ExposedPort; +import com.github.dockerjava.api.model.PortBinding; +import com.github.dockerjava.api.model.Ports; import org.awaitility.Awaitility; import org.junit.jupiter.api.*; import org.redisson.ClusterRunner.ClusterProcesses; @@ -7,6 +12,10 @@ import org.redisson.RedisRunner.KEYSPACE_EVENTS_OPTIONS; import org.redisson.RedisRunner.RedisProcess; import org.redisson.api.*; import org.redisson.api.listener.*; +import org.redisson.api.redisnode.RedisCluster; +import org.redisson.api.redisnode.RedisClusterMaster; +import org.redisson.api.redisnode.RedisClusterSlave; +import org.redisson.api.redisnode.RedisNodes; import org.redisson.client.*; import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.StringCodec; @@ -15,7 +24,13 @@ import org.redisson.client.protocol.RedisStrictCommand; import org.redisson.cluster.ClusterNodeInfo; import org.redisson.config.Config; import org.redisson.config.SubscriptionMode; +import org.redisson.connection.SequentialDnsAddressResolverFactory; import org.redisson.connection.balancer.RandomLoadBalancer; +import org.redisson.misc.RedisURI; +import org.testcontainers.DockerClientFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy; import java.io.IOException; import java.io.Serializable; @@ -26,40 +41,14 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; -public class RedissonTopicTest { - - @BeforeAll - public static void beforeClass() throws IOException, InterruptedException { - if (!RedissonRuntimeEnvironment.isTravis) { - RedisRunner.startDefaultRedisServerInstance(); - } - } - - @AfterAll - public static void afterClass() throws InterruptedException { - if (!RedissonRuntimeEnvironment.isTravis) { - RedisRunner.shutDownDefaultRedisServerInstance(); - } - } - - @BeforeEach - public void before() throws IOException, InterruptedException { - if (RedissonRuntimeEnvironment.isTravis) { - RedisRunner.startDefaultRedisServerInstance(); - } - } - - @AfterEach - public void after() throws InterruptedException { - if (RedissonRuntimeEnvironment.isTravis) { - RedisRunner.shutDownDefaultRedisServerInstance(); - } - } +public class RedissonTopicTest extends RedisDockerTest { public static class Message implements Serializable { @@ -97,40 +86,33 @@ public class RedissonTopicTest { @Test public void testCluster() throws IOException, InterruptedException { - RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave().notifyKeyspaceEvents( - RedisRunner.KEYSPACE_EVENTS_OPTIONS.E, - RedisRunner.KEYSPACE_EVENTS_OPTIONS.g); - RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave().notifyKeyspaceEvents( - RedisRunner.KEYSPACE_EVENTS_OPTIONS.E, - RedisRunner.KEYSPACE_EVENTS_OPTIONS.g); - RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave().notifyKeyspaceEvents( - RedisRunner.KEYSPACE_EVENTS_OPTIONS.E, - RedisRunner.KEYSPACE_EVENTS_OPTIONS.g); - RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave().notifyKeyspaceEvents( - RedisRunner.KEYSPACE_EVENTS_OPTIONS.E, - RedisRunner.KEYSPACE_EVENTS_OPTIONS.g); - RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave().notifyKeyspaceEvents( - RedisRunner.KEYSPACE_EVENTS_OPTIONS.E, - RedisRunner.KEYSPACE_EVENTS_OPTIONS.g); - RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave().notifyKeyspaceEvents( - RedisRunner.KEYSPACE_EVENTS_OPTIONS.E, - RedisRunner.KEYSPACE_EVENTS_OPTIONS.g); - - ClusterRunner clusterRunner = new ClusterRunner() - .addNode(master1, slave1) - .addNode(master2, slave2) - .addNode(master3, slave3); - ClusterRunner.ClusterProcesses process = clusterRunner.run(); - - Thread.sleep(3000); + GenericContainer redisCluster = new GenericContainer<>("vishnunair/docker-redis-cluster") + .withExposedPorts(6379, 6380, 6381, 6382, 6383, 6384) + .withStartupCheckStrategy(new MinimumDurationRunningStartupCheckStrategy(Duration.ofSeconds(10))); + redisCluster.start(); Config config = new Config(); config.useClusterServers() - .setPingConnectionInterval(0) - .setLoadBalancer(new RandomLoadBalancer()) - .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); + .setNatMapper(new NatMapper() { + @Override + public RedisURI map(RedisURI uri) { + if (redisCluster.getMappedPort(uri.getPort()) == null) { + return uri; + } + return new RedisURI(uri.getScheme(), redisCluster.getHost(), redisCluster.getMappedPort(uri.getPort())); + } + }) + .addNodeAddress("redis://127.0.0.1:" + redisCluster.getFirstMappedPort()); RedissonClient redisson = Redisson.create(config); + RedisCluster nodes = redisson.getRedisNodes(RedisNodes.CLUSTER); + for (RedisClusterSlave slave : nodes.getSlaves()) { + slave.setConfig("notify-keyspace-events", "Eg"); + } + for (RedisClusterMaster master : nodes.getMasters()) { + master.setConfig("notify-keyspace-events", "Eg"); + } + AtomicInteger subscribedCounter = new AtomicInteger(); AtomicInteger unsubscribedCounter = new AtomicInteger(); RTopic topic = redisson.getTopic("__keyevent@0__:del", StringCodec.INSTANCE); @@ -167,12 +149,11 @@ public class RedissonTopicTest { topic.removeListener(id1, id2); redisson.shutdown(); - process.shutdown(); + redisCluster.stop(); } @Test public void testCountSubscribers() { - RedissonClient redisson = BaseTest.createInstance(); RTopic topic1 = redisson.getTopic("topic", LongCodec.INSTANCE); assertThat(topic1.countSubscribers()).isZero(); int id = topic1.addListener(Long.class, (channel, msg) -> { @@ -180,13 +161,10 @@ public class RedissonTopicTest { assertThat(topic1.countSubscribers()).isOne(); topic1.removeListener(id); assertThat(topic1.countSubscribers()).isZero(); - - redisson.shutdown(); } @Test public void testCountListeners() { - RedissonClient redisson = BaseTest.createInstance(); RTopic topic1 = redisson.getTopic("topic", LongCodec.INSTANCE); assertThat(topic1.countListeners()).isZero(); int id = topic1.addListener(Long.class, (channel, msg) -> { @@ -204,13 +182,11 @@ public class RedissonTopicTest { topic2.removeListener(id2); assertThat(topic2.countListeners()).isZero(); - - redisson.shutdown(); } @Test public void testPing() throws InterruptedException { - Config config = BaseTest.createConfig(); + Config config = createConfig(); config.useSingleServer() .setPingConnectionInterval(50) .setConnectTimeout(20_000) @@ -220,13 +196,11 @@ public class RedissonTopicTest { .setConnectionPoolSize(16); RedissonClient redisson = Redisson.create(config); - int count = 6000; + int count = 3000; CountDownLatch latch = new CountDownLatch(count); RTopic eventsTopic = redisson.getTopic("eventsTopic"); - AtomicInteger co = new AtomicInteger(); eventsTopic.addListener(String.class, (channel, msg) -> { - co.incrementAndGet(); latch.countDown(); }); @@ -236,17 +210,15 @@ public class RedissonTopicTest { Thread.sleep(10); } - assertThat(latch.await(40, TimeUnit.SECONDS)).isTrue(); + assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue(); redisson.shutdown(); } @Test public void testConcurrentTopic() throws Exception { - RedissonClient redisson = BaseTest.createInstance(); - - int threads = 30; - int loops = 50000; + int threads = 16; + int loops = 25000; ExecutorService executor = Executors.newFixedThreadPool(threads); List> futures = new ArrayList<>(); @@ -281,15 +253,12 @@ public class RedissonTopicTest { for (Future future : futures) { future.get(); } - - redisson.shutdown(); } @Test - public void testCommandsOrdering() throws InterruptedException { - RedissonClient redisson1 = BaseTest.createInstance(); - RTopic topic1 = redisson1.getTopic("topic", LongCodec.INSTANCE); + public void testCommandsOrdering() { + RTopic topic1 = redisson.getTopic("topic", LongCodec.INSTANCE); AtomicBoolean stringMessageReceived = new AtomicBoolean(); topic1.addListener(Long.class, (channel, msg) -> { assertThat(msg).isEqualTo(123); @@ -299,13 +268,11 @@ public class RedissonTopicTest { await().atMost(Duration.ofSeconds(1)).untilTrue(stringMessageReceived); - redisson1.shutdown(); + topic1.removeAllListeners(); } @Test public void testTopicState() throws InterruptedException { - RedissonClient redisson = BaseTest.createInstance(); - RTopic stringTopic = redisson.getTopic("test1", StringCodec.INSTANCE); for (int i = 0; i < 3; i++) { AtomicInteger stringMessageReceived = new AtomicInteger(); @@ -332,14 +299,10 @@ public class RedissonTopicTest { stringTopic.removeListener(listenerId); patternTopic.removeListener(patternListenerId); } - - redisson.shutdown(); } @Test public void testMultiTypeConnection() throws InterruptedException { - RedissonClient redisson = BaseTest.createInstance(); - RTopic stringTopic = redisson.getTopic("test1", StringCodec.INSTANCE); AtomicBoolean stringMessageReceived = new AtomicBoolean(); stringTopic.addListener(String.class, new MessageListener() { @@ -365,12 +328,11 @@ public class RedissonTopicTest { await().atMost(Duration.ofSeconds(1)).untilTrue(stringMessageReceived); await().atMost(Duration.ofSeconds(1)).untilTrue(longMessageReceived); - redisson.shutdown(); + stringTopic.removeAllListeners(); } @Test public void testSyncCommands() throws InterruptedException { - RedissonClient redisson = BaseTest.createInstance(); RTopic topic = redisson.getTopic("system_bus"); RSet redissonSet = redisson.getSet("set1"); CountDownLatch latch = new CountDownLatch(1); @@ -384,21 +346,20 @@ public class RedissonTopicTest { topic.publish("sometext"); latch.await(); - redisson.shutdown(); + topic.removeAllListeners(); } @Test public void testInnerPublish() throws InterruptedException { - RedissonClient redisson1 = BaseTest.createInstance(); - final RTopic topic1 = redisson1.getTopic("topic1"); + final RTopic topic1 = redisson.getTopic("topic1"); final CountDownLatch messageRecieved = new CountDownLatch(3); int listenerId = topic1.addListener(Message.class, (channel, msg) -> { assertThat(msg).isEqualTo(new Message("test")); messageRecieved.countDown(); }); - RedissonClient redisson2 = BaseTest.createInstance(); + RedissonClient redisson2 = createInstance(); final RTopic topic2 = redisson2.getTopic("topic2"); topic2.addListener(Message.class, (channel, msg) -> { messageRecieved.countDown(); @@ -412,13 +373,12 @@ public class RedissonTopicTest { assertThat(messageRecieved.await(5, TimeUnit.SECONDS)).isTrue(); - redisson1.shutdown(); + topic1.removeAllListeners(); redisson2.shutdown(); } @Test public void testStatus() throws InterruptedException { - RedissonClient redisson = BaseTest.createInstance(); final RTopic topic1 = redisson.getTopic("topic1"); final CountDownLatch l = new CountDownLatch(1); int listenerId = topic1.addListener(new BaseStatusListener() { @@ -442,108 +402,106 @@ public class RedissonTopicTest { topic1.removeListener(listenerId2); assertThat(l.await(5, TimeUnit.SECONDS)).isTrue(); - redisson.shutdown(); } @Test - public void testSlotMigrationInCluster() throws Exception { - RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slot1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slot2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slot3 = new RedisRunner().randomPort().randomDir().nosave(); - - ClusterRunner clusterRunner = new ClusterRunner() - .addNode(master1, slot1) - .addNode(master2, slot2) - .addNode(master3, slot3); - ClusterProcesses process = clusterRunner.run(); - - Config config = new Config(); - config.useClusterServers() - .setScanInterval(1000) - .setSubscriptionMode(SubscriptionMode.MASTER) - .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); - RedissonClient redisson = Redisson.create(config); - - RedisClientConfig cfg = new RedisClientConfig(); - cfg.setAddress(process.getNodes().iterator().next().getRedisServerAddressAndPort()); - RedisClient c = RedisClient.create(cfg); - RedisConnection cc = c.connect(); - List mastersList = cc.sync(RedisCommands.CLUSTER_NODES); - mastersList = mastersList.stream().filter(i -> i.containsFlag(ClusterNodeInfo.Flag.MASTER)).collect(Collectors.toList()); - c.shutdown(); - - ClusterNodeInfo destination = mastersList.stream().filter(i -> i.getSlotRanges().iterator().next().getStartSlot() != 10922).findAny().get(); - ClusterNodeInfo source = mastersList.stream().filter(i -> i.getSlotRanges().iterator().next().getStartSlot() == 10922).findAny().get(); - - RedisClientConfig sourceCfg = new RedisClientConfig(); - sourceCfg.setAddress(source.getAddress()); - RedisClient sourceClient = RedisClient.create(sourceCfg); - RedisConnection sourceConnection = sourceClient.connect(); - - RedisClientConfig destinationCfg = new RedisClientConfig(); - destinationCfg.setAddress(destination.getAddress()); - RedisClient destinationClient = RedisClient.create(destinationCfg); - RedisConnection destinationConnection = destinationClient.connect(); - - AtomicReference reference = new AtomicReference(); - String channelName = "test{kaO}"; - RTopic topic = redisson.getTopic(channelName); - topic.addListener(String.class, (ch, m) -> { - reference.set(m); - }); + public void testSlotMigrationInCluster() { + withCluster(client -> { + Config config = client.getConfig(); + config.useClusterServers() + .setScanInterval(1000) + .setSubscriptionMode(SubscriptionMode.MASTER); + + RedissonClient redisson = Redisson.create(config); + + RedisClientConfig cfg = new RedisClientConfig(); + cfg.setAddress(client.getConfig().useClusterServers().getNodeAddresses().get(0)); + RedisClient c = RedisClient.create(cfg); + RedisConnection cc = c.connect(); + List mastersList = cc.sync(RedisCommands.CLUSTER_NODES); + mastersList = mastersList.stream().filter(i -> i.containsFlag(ClusterNodeInfo.Flag.MASTER)).collect(Collectors.toList()); + c.shutdown(); + + int slot = 10922; + + ClusterNodeInfo destination = mastersList.stream().filter(i -> !i.getSlotRanges().isEmpty() && + !i.getSlotRanges().iterator().next().hasSlot(slot)).findAny().get(); + ClusterNodeInfo source = mastersList.stream().filter(i -> !i.getSlotRanges().isEmpty() && + i.getSlotRanges().iterator().next().hasSlot(slot)).findAny().get(); + + RedisClientConfig sourceCfg = new RedisClientConfig(); + sourceCfg.setAddress(config.useClusterServers().getNatMapper().map(source.getAddress())); + RedisClient sourceClient = RedisClient.create(sourceCfg); + RedisConnection sourceConnection = sourceClient.connect(); + + RedisClientConfig destinationCfg = new RedisClientConfig(); + destinationCfg.setAddress(config.useClusterServers().getNatMapper().map(destination.getAddress())); + RedisClient destinationClient = RedisClient.create(destinationCfg); + RedisConnection destinationConnection = destinationClient.connect(); + + AtomicReference reference = new AtomicReference(); + String channelName = "test{kaO}"; + RTopic topic = redisson.getTopic(channelName); + topic.addListener(String.class, (ch, m) -> { + reference.set(m); + }); - List destList = destinationConnection.sync(RedisCommands.PUBSUB_CHANNELS); - assertThat(destList).isEmpty(); - List sourceList = sourceConnection.sync(RedisCommands.PUBSUB_CHANNELS); - assertThat(sourceList).containsOnly(channelName); - - destinationConnection.sync(RedisCommands.CLUSTER_SETSLOT, source.getSlotRanges().iterator().next().getStartSlot(), "IMPORTING", source.getNodeId()); - sourceConnection.sync(RedisCommands.CLUSTER_SETSLOT, source.getSlotRanges().iterator().next().getStartSlot(), "MIGRATING", destination.getNodeId()); - - List keys = sourceConnection.sync(RedisCommands.CLUSTER_GETKEYSINSLOT, source.getSlotRanges().iterator().next().getStartSlot(), 100); - List params = new ArrayList(); - params.add(destination.getAddress().getHost()); - params.add(destination.getAddress().getPort()); - params.add(""); - params.add(0); - params.add(2000); - params.add("KEYS"); - params.addAll(keys); - sourceConnection.async(RedisCommands.MIGRATE, params.toArray()); - - for (ClusterNodeInfo node : mastersList) { - RedisClientConfig cc1 = new RedisClientConfig(); - cc1.setAddress(node.getAddress()); - RedisClient ccc = RedisClient.create(cc1); - RedisConnection connection = ccc.connect(); - connection.sync(RedisCommands.CLUSTER_SETSLOT, source.getSlotRanges().iterator().next().getStartSlot(), "NODE", destination.getNodeId()); - ccc.shutdownAsync(); - } + List destList = destinationConnection.sync(RedisCommands.PUBSUB_CHANNELS); + assertThat(destList).isEmpty(); + List sourceList = sourceConnection.sync(RedisCommands.PUBSUB_CHANNELS); + assertThat(sourceList).containsOnly(channelName); + + destinationConnection.sync(RedisCommands.CLUSTER_SETSLOT, slot, "IMPORTING", source.getNodeId()); + sourceConnection.sync(RedisCommands.CLUSTER_SETSLOT, slot, "MIGRATING", destination.getNodeId()); + + List keys = sourceConnection.sync(RedisCommands.CLUSTER_GETKEYSINSLOT, slot, 100); + List params = new ArrayList<>(); + params.add(destination.getAddress().getHost()); + params.add(destination.getAddress().getPort()); + params.add(""); + params.add(0); + params.add(2000); + params.add("KEYS"); + params.addAll(keys); + sourceConnection.sync(RedisCommands.MIGRATE, params.toArray()); + + for (ClusterNodeInfo node : mastersList) { + if (node.getSlotRanges().isEmpty()) { + continue; + } + RedisClientConfig cc1 = new RedisClientConfig(); + cc1.setAddress(config.useClusterServers().getNatMapper().map(node.getAddress())); + RedisClient ccc = RedisClient.create(cc1); + RedisConnection connection = ccc.connect(); + connection.sync(RedisCommands.CLUSTER_SETSLOT, slot, + "NODE", destination.getNodeId()); + ccc.shutdownAsync(); + } - Thread.sleep(2000); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } - topic.publish("mymessage"); - Awaitility.waitAtMost(Duration.ofSeconds(1)).until(() -> reference.get().equals("mymessage")); + topic.publish("mymessage"); + Awaitility.waitAtMost(Duration.ofSeconds(1)).until(() -> reference.get().equals("mymessage")); - List destList2 = destinationConnection.sync(RedisCommands.PUBSUB_CHANNELS); - assertThat(destList2).containsOnly(channelName); - List sourceList2 = sourceConnection.sync(RedisCommands.PUBSUB_CHANNELS); - assertThat(sourceList2).isEmpty(); + List sourceList2 = sourceConnection.sync(RedisCommands.PUBSUB_CHANNELS); + assertThat(sourceList2).isEmpty(); + List destList2 = destinationConnection.sync(RedisCommands.PUBSUB_CHANNELS); + assertThat(destList2).containsOnly(channelName); - sourceClient.shutdown(); - destinationClient.shutdown(); - redisson.shutdown(); - process.shutdown(); + sourceClient.shutdown(); + destinationClient.shutdown(); + redisson.shutdown(); + }); } @Test public void testUnsubscribe() throws InterruptedException { final CountDownLatch messageRecieved = new CountDownLatch(1); - RedissonClient redisson = BaseTest.createInstance(); RTopic topic1 = redisson.getTopic("topic1"); int listenerId = topic1.addListener(Message.class, (channel, msg) -> { Assertions.fail(); @@ -560,12 +518,11 @@ public class RedissonTopicTest { assertThat(messageRecieved.await(5, TimeUnit.SECONDS)).isTrue(); - redisson.shutdown(); + topic1.removeAllListeners(); } @Test public void testRemoveAllListeners() throws InterruptedException { - RedissonClient redisson = BaseTest.createInstance(); RTopic topic1 = redisson.getTopic("topic1"); AtomicInteger counter = new AtomicInteger(); @@ -581,23 +538,15 @@ public class RedissonTopicTest { Thread.sleep(1000); assertThat(counter.get()).isZero(); - - redisson.shutdown(); + topic1.removeAllListeners(); } @Test - public void testSubscribeLimit() throws Exception { - RedisProcess runner = new RedisRunner() - .port(RedisRunner.findFreePort()) - .nosave() - .randomDir() - .run(); - + public void testSubscribeLimit() { + Config config = redisson.getConfig(); int connection = 10; int subscription = 5; - Config config = new Config(); config.useSingleServer() - .setAddress("redis://localhost:" + runner.getRedisServerPort()) .setSubscriptionConnectionPoolSize(connection) .setSubscriptionsPerConnection(subscription); RedissonClient redissonClient = Redisson.create(config); @@ -630,12 +579,10 @@ public class RedissonTopicTest { } redissonClient.shutdown(); - runner.stop(); } @Test public void testRemoveAllListeners2() throws InterruptedException { - RedissonClient redisson = BaseTest.createInstance(); RTopic topic1 = redisson.getTopic("topic1"); AtomicInteger counter = new AtomicInteger(); @@ -652,13 +599,10 @@ public class RedissonTopicTest { } Awaitility.await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> assertThat(counter.get()).isZero()); - - redisson.shutdown(); } @Test public void testRemoveByInstance() throws InterruptedException { - RedissonClient redisson = BaseTest.createInstance(); RTopic topic1 = redisson.getTopic("topic1"); MessageListener listener = new MessageListener() { @Override @@ -672,8 +616,6 @@ public class RedissonTopicTest { topic1 = redisson.getTopic("topic1"); topic1.removeListener(listener); topic1.publish(new Message("123")); - - redisson.shutdown(); } @@ -681,8 +623,7 @@ public class RedissonTopicTest { public void testLazyUnsubscribe() throws InterruptedException { final CountDownLatch messageRecieved = new CountDownLatch(1); - RedissonClient redisson1 = BaseTest.createInstance(); - RTopic topic1 = redisson1.getTopic("topic"); + RTopic topic1 = redisson.getTopic("topic"); int listenerId = topic1.addListener(Message.class, (channel, msg) -> { Assertions.fail(); }); @@ -690,7 +631,7 @@ public class RedissonTopicTest { topic1.removeListener(listenerId); Thread.sleep(1000); - RedissonClient redisson2 = BaseTest.createInstance(); + RedissonClient redisson2 = createInstance(); RTopic topic2 = redisson2.getTopic("topic"); topic2.addListener(Message.class, (channel, msg) -> { assertThat(msg).isEqualTo(new Message("123")); @@ -700,7 +641,6 @@ public class RedissonTopicTest { assertThat(messageRecieved.await(5, TimeUnit.SECONDS)).isTrue(); - redisson1.shutdown(); redisson2.shutdown(); } @@ -708,14 +648,13 @@ public class RedissonTopicTest { public void test() throws InterruptedException { final CountDownLatch messageRecieved = new CountDownLatch(2); - RedissonClient redisson1 = BaseTest.createInstance(); - RTopic topic1 = redisson1.getTopic("topic"); + RTopic topic1 = redisson.getTopic("topic"); topic1.addListener(Message.class, (channel, msg) -> { assertThat(msg).isEqualTo(new Message("123")); messageRecieved.countDown(); }); - RedissonClient redisson2 = BaseTest.createInstance(); + RedissonClient redisson2 = createInstance(); RTopic topic2 = redisson2.getTopic("topic"); topic2.addListener(Message.class, (channel, msg) -> { assertThat(msg).isEqualTo(new Message("123")); @@ -725,7 +664,7 @@ public class RedissonTopicTest { messageRecieved.await(); - redisson1.shutdown(); + topic1.removeAllListeners(); redisson2.shutdown(); } @@ -734,15 +673,14 @@ public class RedissonTopicTest { final CountDownLatch messageRecieved = new CountDownLatch(1000); AtomicLong counter = new AtomicLong(); - RedissonClient redisson1 = BaseTest.createInstance(); - RTopic topic1 = redisson1.getTopic("topic"); + RTopic topic1 = redisson.getTopic("topic"); topic1.addListener(Message.class, (channel, msg) -> { assertThat(msg).isEqualTo(new Message("123")); messageRecieved.countDown(); counter.incrementAndGet(); }); - RedissonClient redisson2 = BaseTest.createInstance(); + RedissonClient redisson2 = createInstance(); RTopic topic2 = redisson2.getTopic("topic"); topic2.addListener(Message.class, (channel, msg) -> { assertThat(msg).isEqualTo(new Message("123")); @@ -760,39 +698,37 @@ public class RedissonTopicTest { assertThat(count).isEqualTo(counter.get()); - redisson1.shutdown(); + topic1.removeAllListeners(); redisson2.shutdown(); } @Test public void testListenerRemove() throws InterruptedException { - RedissonClient redisson1 = BaseTest.createInstance(); - RTopic topic1 = redisson1.getTopic("topic"); + RTopic topic1 = redisson.getTopic("topic"); int id = topic1.addListener(Message.class, (channel, msg) -> { Assertions.fail(); }); - RedissonClient redisson2 = BaseTest.createInstance(); + RedissonClient redisson2 = createInstance(); RTopic topic2 = redisson2.getTopic("topic"); topic1.removeListener(id); topic2.publish(new Message("123")); Thread.sleep(1000); - redisson1.shutdown(); redisson2.shutdown(); } @Test public void testReattach() throws Exception { - RedisProcess runner = new RedisRunner() - .nosave() - .randomDir() - .randomPort() - .run(); - + GenericContainer redis = + new GenericContainer<>("redis:7.2") + .withExposedPorts(6379); + redis.start(); + Config config = new Config(); - config.useSingleServer().setAddress(runner.getRedisServerAddressAndPort()); + config.useSingleServer() + .setAddress("redis://127.0.0.1:" + redis.getFirstMappedPort()); RedissonClient redisson = Redisson.create(config); final AtomicBoolean executed = new AtomicBoolean(); @@ -816,16 +752,12 @@ public class RedissonTopicTest { executed.set(true); } }); - - runner.stop(); - runner = new RedisRunner() - .port(runner.getRedisServerPort()) - .nosave() - .randomDir() - .run(); - - Thread.sleep(1000); + redis.setPortBindings(Arrays.asList(redis.getFirstMappedPort() + ":6379")); + redis.stop(); + redis.start(); + + Thread.sleep(2000); redisson.getTopic("topic").publish(1); @@ -833,23 +765,23 @@ public class RedissonTopicTest { await().atMost(2, TimeUnit.SECONDS).until(() -> subscriptions.get() == 2); redisson.shutdown(); - runner.stop(); + redis.stop(); } @Test public void testAddListenerFailover() throws Exception { - RedisProcess runner = new RedisRunner() - .nosave() - .randomDir() - .randomPort() - .run(); + GenericContainer redis = + new GenericContainer<>("redis:7.2") + .withExposedPorts(6379); + redis.start(); Config config = new Config(); config.useSingleServer() - .setAddress(runner.getRedisServerAddressAndPort()); + .setAddress("redis://127.0.0.1:" + redis.getFirstMappedPort()); RedissonClient redisson = Redisson.create(config); - runner.stop(); + redis.setPortBindings(Arrays.asList(redis.getFirstMappedPort() + ":6379")); + redis.stop(); RTopic topic = redisson.getTopic("topic"); Assertions.assertThrows(RedisException.class, () -> { @@ -857,11 +789,9 @@ public class RedissonTopicTest { }); }); - runner = new RedisRunner() - .port(runner.getRedisServerPort()) - .nosave() - .randomDir() - .run(); + redis.start(); + + Thread.sleep(3000); AtomicBoolean executed = new AtomicBoolean(); topic.addListener(Integer.class, (channel, msg) -> { @@ -874,7 +804,7 @@ public class RedissonTopicTest { await().atMost(1, TimeUnit.SECONDS).untilTrue(executed); redisson.shutdown(); - runner.stop(); + redis.stop(); } @@ -894,6 +824,86 @@ public class RedissonTopicTest { @Test public void testResubscriptionAfterFailover() throws Exception { +// withSentinel((nodes, redisson) -> { +// Config config = redisson.getConfig(); +// config.useSentinelServers() +// .setSubscriptionsPerConnection(20) +// .setSubscriptionConnectionPoolSize(200); +// +// RedissonClient redissonClient = Redisson.create(config); +// +// ScheduledExecutorService executor1 = Executors.newScheduledThreadPool(5); +// +// AtomicBoolean exceptionDetected = new AtomicBoolean(false); +// +// Deque status = new ConcurrentLinkedDeque<>(); +// Runnable rLockPayload = +// () -> { +// try { +// Integer randomLock = ThreadLocalRandom.current().nextInt(100); +// RLock lock = redissonClient.getLock(randomLock.toString()); +// lock.lock(10, TimeUnit.SECONDS); +// lock.unlock(); +// status.add("ok"); +// } catch (Exception e) { +// status.add("failed"); +// if (e.getCause().getMessage().contains("slaves were synced")) { +// return; +// } +// e.printStackTrace(); +// exceptionDetected.set(true); +// } +// }; +// +// executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS); +// executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS); +// executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS); +// executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS); +// executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS); +// +// try { +// Thread.sleep(Duration.ofSeconds(10).toMillis()); +// } catch (InterruptedException e) { +// throw new RuntimeException(e); +// } +// +// RedisClientConfig masterConfig = new RedisClientConfig().setAddress("redis://127.0.0.1:" + nodes.get(0).getFirstMappedPort()); +// +// //Failover from master to slave +// try { +// RedisClient.create(masterConfig).connect().sync(RedisCommands.SHUTDOWN); +// } catch (RedisTimeoutException e) { +// // node goes down, so this command times out waiting for the response +// } +// +// GenericContainer slave = +// new GenericContainer<>("bitnami/redis") +//// .withNetwork(nodes.get(1).getNetwork()) +// .withEnv("REDIS_REPLICATION_MODE", "slave") +// .withEnv("REDIS_MASTER_HOST", "redis") +// .withEnv("ALLOW_EMPTY_PASSWORD", "yes") +// .withNetworkAliases("slave2") +// .withExposedPorts(6379); +// nodes.add(2, slave); +// slave.setPortBindings(Arrays.asList("6312:6379")); +// slave.start(); +// +// System.out.println("Failover Finished, start to see Subscribe timeouts now. Can't recover this without a refresh of redison client "); +// try { +// Thread.sleep(Duration.ofSeconds(10).toMillis()); +// } catch (InterruptedException e) { +// throw new RuntimeException(e); +// } +// +// assertThat(exceptionDetected.get()).isFalse(); +// assertThat(status.peekLast()).isEqualTo("ok"); +// +// executor1.shutdown(); +// +// redissonClient.shutdown(); +// +// }, 1); + RedisRunner.RedisProcess master = new RedisRunner() .nosave() .randomDir() @@ -1012,140 +1022,191 @@ public class RedissonTopicTest { slave2.stop(); } - - @Test - public void testReattachInSentinel() throws Exception { - RedisRunner.RedisProcess master = new RedisRunner() - .nosave() - .randomDir() - .port(6399) - .run(); - RedisRunner.RedisProcess slave1 = new RedisRunner() - .port(6380) - .nosave() - .randomDir() - .slaveof("127.0.0.1", 6399) - .run(); - RedisRunner.RedisProcess slave2 = new RedisRunner() - .port(6381) - .nosave() - .randomDir() - .slaveof("127.0.0.1", 6399) - .run(); - RedisRunner.RedisProcess sentinel1 = new RedisRunner() - .nosave() - .randomDir() - .port(26379) - .sentinel() - .sentinelMonitor("myMaster", "127.0.0.1", 6399, 2) - .run(); - RedisRunner.RedisProcess sentinel2 = new RedisRunner() - .nosave() - .randomDir() - .port(26380) - .sentinel() - .sentinelMonitor("myMaster", "127.0.0.1", 6399, 2) - .run(); - RedisRunner.RedisProcess sentinel3 = new RedisRunner() - .nosave() - .randomDir() - .port(26381) - .sentinel() - .sentinelMonitor("myMaster", "127.0.0.1", 6399, 2) - .run(); - - Thread.sleep(5000); - + private void withSentinel(BiConsumer>, RedissonClient> callback, int slaves) throws InterruptedException { + Network network = Network.newNetwork(); + + List>> nodes = new ArrayList<>(); + + GenericContainer master = + new GenericContainer<>("bitnami/redis") + .withNetwork(network) + .withEnv("REDIS_REPLICATION_MODE", "master") + .withEnv("ALLOW_EMPTY_PASSWORD", "yes") + .withNetworkAliases("redis") + .withExposedPorts(6379); + master.start(); + assert master.getNetwork() == network; + int masterPort = master.getFirstMappedPort(); + master.withCreateContainerCmdModifier(cmd -> { + cmd.getHostConfig().withPortBindings( + new PortBinding(Ports.Binding.bindPort(Integer.valueOf(masterPort)), + cmd.getExposedPorts()[0])); + }); + nodes.add(master); + + for (int i = 0; i < slaves; i++) { + GenericContainer slave = + new GenericContainer<>("bitnami/redis") + .withNetwork(network) + .withEnv("REDIS_REPLICATION_MODE", "slave") + .withEnv("REDIS_MASTER_HOST", "redis") + .withEnv("ALLOW_EMPTY_PASSWORD", "yes") + .withNetworkAliases("slave" + i) + .withExposedPorts(6379); + slave.start(); + int slavePort = slave.getFirstMappedPort(); + slave.withCreateContainerCmdModifier(cmd -> { + cmd.getHostConfig().withPortBindings( + new PortBinding(Ports.Binding.bindPort(Integer.valueOf(slavePort)), + cmd.getExposedPorts()[0])); + }); + nodes.add(slave); + } + + GenericContainer sentinel1 = + new GenericContainer<>("bitnami/redis-sentinel") + .withNetwork(network) + .withNetworkAliases("sentinel1") + .withExposedPorts(26379); + sentinel1.start(); + int sentinel1Port = sentinel1.getFirstMappedPort(); + sentinel1.withCreateContainerCmdModifier(cmd -> { + cmd.getHostConfig().withPortBindings( + new PortBinding(Ports.Binding.bindPort(Integer.valueOf(sentinel1Port)), + cmd.getExposedPorts()[0])); + }); + nodes.add(sentinel1); + + GenericContainer sentinel2 = + new GenericContainer<>("bitnami/redis-sentinel") + .withNetwork(network) + .withNetworkAliases("sentinel2") + .withExposedPorts(26379); + sentinel2.start(); + int sentinel2Port = sentinel2.getFirstMappedPort(); + sentinel2.withCreateContainerCmdModifier(cmd -> { + cmd.getHostConfig().withPortBindings( + new PortBinding(Ports.Binding.bindPort(Integer.valueOf(sentinel2Port)), + cmd.getExposedPorts()[0])); + }); + nodes.add(sentinel2); + + GenericContainer sentinel3 = + new GenericContainer<>("bitnami/redis-sentinel") + .withNetwork(network) + .withNetworkAliases("sentinel3") + .withExposedPorts(26379); + sentinel3.start(); + int sentinel3Port = sentinel3.getFirstMappedPort(); + sentinel3.withCreateContainerCmdModifier(cmd -> { + cmd.getHostConfig().withPortBindings( + new PortBinding(Ports.Binding.bindPort(Integer.valueOf(sentinel3Port)), + cmd.getExposedPorts()[0])); + }); + nodes.add(sentinel3); + + Thread.sleep(5000); + Config config = new Config(); + config.setAddressResolverGroupFactory(new SequentialDnsAddressResolverFactory() { + + }); config.useSentinelServers() - .setLoadBalancer(new RandomLoadBalancer()) - .addSentinelAddress(sentinel3.getRedisServerAddressAndPort()).setMasterName("myMaster"); + .setNatMapper(new NatMapper() { + + @Override + public RedisURI map(RedisURI uri) { + for (GenericContainer> node : nodes) { + if (node.getContainerInfo() == null) { + continue; + } + + Ports.Binding[] mappedPort = node.getContainerInfo().getNetworkSettings() + .getPorts().getBindings().get(new ExposedPort(uri.getPort())); + + Map ss = node.getContainerInfo().getNetworkSettings().getNetworks(); + ContainerNetwork s = ss.values().iterator().next(); + + if (uri.getPort() == 6379 && node.getNetworkAliases().contains("slave0")) { + return new RedisURI(uri.getScheme(), "127.0.0.1", Integer.valueOf(mappedPort[0].getHostPortSpec())); +// System.out.println("attempt " + uri + " - " + node.getNetworkAliases() + " " + node.isHostAccessible() +" " + node.isRunning()); + } + + if ("redis".equals(uri.getHost()) + && node.getNetworkAliases().contains(uri.getHost())) { + return new RedisURI(uri.getScheme(), "127.0.0.1", Integer.valueOf(mappedPort[0].getHostPortSpec())); + } + + if (mappedPort != null + && s.getIpAddress().equals(uri.getHost())) { + return new RedisURI(uri.getScheme(), "127.0.0.1", Integer.valueOf(mappedPort[0].getHostPortSpec())); + } + } + return uri; + } + }) + .addSentinelAddress("redis://127.0.0.1:" + sentinel1.getFirstMappedPort()) + .setMasterName("mymaster"); + RedissonClient redisson = Redisson.create(config); - - final AtomicBoolean executed = new AtomicBoolean(); - final AtomicInteger subscriptions = new AtomicInteger(); - - RTopic topic = redisson.getTopic("topic"); - topic.addListener(new StatusListener() { - - @Override - public void onUnsubscribe(String channel) { - } - - @Override - public void onSubscribe(String channel) { - subscriptions.incrementAndGet(); + + callback.accept(nodes, redisson); + + redisson.shutdown(); + nodes.forEach(n -> n.stop()); + network.close(); + } + + + @Test + public void testReattachInSentinel() throws Exception { + withSentinel((nodes, redisson) -> { + final AtomicBoolean executed = new AtomicBoolean(); + final AtomicInteger subscriptions = new AtomicInteger(); + + RTopic topic = redisson.getTopic("topic"); + topic.addListener(new StatusListener() { + + @Override + public void onUnsubscribe(String channel) { + } + + @Override + public void onSubscribe(String channel) { + subscriptions.incrementAndGet(); + } + }); + topic.addListener(Integer.class, new MessageListener() { + @Override + public void onMessage(CharSequence channel, Integer msg) { + executed.set(true); + } + }); + + sendCommands(redisson, "topic"); + + nodes.forEach(n -> n.stop()); + + try { + TimeUnit.SECONDS.sleep(20); + } catch (InterruptedException e) { + throw new RuntimeException(e); } - }); - topic.addListener(Integer.class, new MessageListener() { - @Override - public void onMessage(CharSequence channel, Integer msg) { - executed.set(true); + + nodes.forEach(n -> n.start()); + + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + throw new RuntimeException(e); } - }); - - sendCommands(redisson, "topic"); - - sentinel1.stop(); - sentinel2.stop(); - sentinel3.stop(); - master.stop(); - slave1.stop(); - slave2.stop(); - - Thread.sleep(TimeUnit.SECONDS.toMillis(20)); - - master = new RedisRunner() - .port(6390) - .nosave() - .randomDir() - .run(); - slave1 = new RedisRunner() - .port(6391) - .nosave() - .randomDir() - .slaveof("127.0.0.1", 6390) - .run(); - slave2 = new RedisRunner() - .port(6392) - .nosave() - .randomDir() - .slaveof("127.0.0.1", 6390) - .run(); - sentinel1 = new RedisRunner() - .nosave() - .randomDir() - .port(26379) - .sentinel() - .sentinelMonitor("myMaster", "127.0.0.1", 6390, 2) - .run(); - sentinel2 = new RedisRunner() - .nosave() - .randomDir() - .port(26380) - .sentinel() - .sentinelMonitor("myMaster", "127.0.0.1", 6390, 2) - .run(); - sentinel3 = new RedisRunner() - .nosave() - .randomDir() - .port(26381) - .sentinel() - .sentinelMonitor("myMaster", "127.0.0.1", 6390, 2) - .run(); - - redisson.getTopic("topic").publish(1); - - await().atMost(20, TimeUnit.SECONDS).until(() -> subscriptions.get() == 2); - assertThat(executed.get()).isTrue(); - redisson.shutdown(); - sentinel1.stop(); - sentinel2.stop(); - sentinel3.stop(); - master.stop(); - slave1.stop(); - slave2.stop(); + redisson.getTopic("topic").publish(1); + + await().atMost(20, TimeUnit.SECONDS).until(() -> subscriptions.get() == 2); + assertThat(executed.get()).isTrue(); + + }, 2); } // @Test @@ -1336,69 +1397,51 @@ public class RedissonTopicTest { @Test public void testClusterSharding() throws IOException, InterruptedException { - RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave(); - - ClusterRunner clusterRunner = new ClusterRunner() - .addNode(master1, slave1) - .addNode(master2, slave2) - .addNode(master3, slave3); - ClusterProcesses process = clusterRunner.run(); - - Config config = new Config(); - config.useClusterServers() - .setLoadBalancer(new RandomLoadBalancer()) - .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); - RedissonClient redisson = Redisson.create(config); - - AtomicInteger counter = new AtomicInteger(); - for (int i = 0; i < 10; i++) { - int j = i; - RTopic topic = redisson.getTopic("test" + i); - topic.addListener(Integer.class, (c, v) -> { - assertThat(v).isEqualTo(j); - counter.incrementAndGet(); - }); - } + testInCluster(redisson -> { + AtomicInteger counter = new AtomicInteger(); + for (int i = 0; i < 10; i++) { + int j = i; + RTopic topic = redisson.getTopic("test" + i); + topic.addListener(Integer.class, (c, v) -> { + assertThat(v).isEqualTo(j); + counter.incrementAndGet(); + }); + } - for (int i = 0; i < 10; i++) { - RTopic topic = redisson.getTopic("test" + i); - topic.publish(i); - } + for (int i = 0; i < 10; i++) { + RTopic topic = redisson.getTopic("test" + i); + topic.publish(i); + } - Awaitility.await().atMost(Duration.ofSeconds(5)).until(() -> counter.get() == 10); + Awaitility.await().atMost(Duration.ofSeconds(5)).until(() -> counter.get() == 10); - redisson.shutdown(); - process.shutdown(); + for (int i = 0; i < 10; i++) { + RTopic topic = redisson.getTopic("test" + i); + topic.removeAllListeners(); + } + }); } @Test public void testReattachInClusterSlave() throws Exception { - RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave(); - - - ClusterRunner clusterRunner = new ClusterRunner() - .addNode(master1, slave1) - .addNode(master2, slave2) - .addNode(master3, slave3); - ClusterProcesses process = clusterRunner.run(); - - Thread.sleep(1000); + GenericContainer redisCluster = new GenericContainer<>("vishnunair/docker-redis-cluster") + .withExposedPorts(6379, 6380, 6381, 6382, 6383, 6384) + .withStartupCheckStrategy(new MinimumDurationRunningStartupCheckStrategy(Duration.ofSeconds(10))); + redisCluster.start(); Config config = new Config(); config.useClusterServers() + .setNatMapper(new NatMapper() { + @Override + public RedisURI map(RedisURI uri) { + if (redisCluster.getMappedPort(uri.getPort()) == null) { + return uri; + } + return new RedisURI(uri.getScheme(), redisCluster.getHost(), redisCluster.getMappedPort(uri.getPort())); + } + }) .setSubscriptionMode(SubscriptionMode.SLAVE) - .setLoadBalancer(new RandomLoadBalancer()) - .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); + .addNodeAddress("redis://127.0.0.1:" + redisCluster.getFirstMappedPort()); RedissonClient redisson = Redisson.create(config); final AtomicBoolean executed = new AtomicBoolean(); @@ -1425,17 +1468,16 @@ public class RedissonTopicTest { assertThat(topic.countListeners()).isEqualTo(2); sendCommands(redisson, "topic"); - - process.getNodes().stream().filter(x -> Arrays.asList(slave1.getPort(), slave2.getPort(), slave3.getPort()).contains(x.getRedisServerPort())) - .forEach(x -> { - try { - x.stop(); - Thread.sleep(18000); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - }); + + RedisCluster nodes = redisson.getRedisNodes(RedisNodes.CLUSTER); + for (RedisClusterSlave slave : nodes.getSlaves()) { + RedisClientConfig cc = new RedisClientConfig(); + cc.setAddress("redis://" + slave.getAddr().getHostString() + ":" + slave.getAddr().getPort()); + RedisClient c = RedisClient.create(cc); + c.connect().async(RedisCommands.SHUTDOWN); + c.shutdown(); + Thread.sleep(18000); + } Thread.sleep(15000); @@ -1446,7 +1488,7 @@ public class RedissonTopicTest { assertThat(executed.get()).isTrue(); redisson.shutdown(); - process.shutdown(); + redisCluster.stop(); } @Test @@ -1566,85 +1608,104 @@ public class RedissonTopicTest { @Test public void testReattachInClusterMaster2() throws Exception { - RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave(); - RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave(); - RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave(); - RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave(); - RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave(); - RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave(); - - ClusterRunner clusterRunner = new ClusterRunner() - .addNode(master1, slave1) - .addNode(master2, slave2) - .addNode(master3, slave3); - ClusterProcesses process = clusterRunner.run(); + withCluster(redisson -> { + Queue messages = new ConcurrentLinkedQueue<>(); + Queue subscriptions = new ConcurrentLinkedQueue<>(); - Config config = new Config(); - config.useClusterServers() - .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); - RedissonClient redisson = Redisson.create(config); - - Queue messages = new ConcurrentLinkedQueue<>(); - Queue subscriptions = new ConcurrentLinkedQueue<>(); + int topicsAmount = 100; + for (int i = 0; i < topicsAmount; i++) { + RTopic topic = redisson.getTopic("topic" + i); + int finalI = i; + topic.addListener(new StatusListener() { - int topicsAmount = 100; - for (int i = 0; i < topicsAmount; i++) { - RTopic topic = redisson.getTopic("topic" + i); - int finalI = i; - topic.addListener(new StatusListener() { + @Override + public void onUnsubscribe(String channel) { + } - @Override - public void onUnsubscribe(String channel) { - } + @Override + public void onSubscribe(String channel) { + subscriptions.add("topic" + finalI); + } + }); + topic.addListener(String.class, (channel, msg) -> messages.add(msg)); + } - @Override - public void onSubscribe(String channel) { - subscriptions.add("topic" + finalI); - } + RedisCluster nodes = redisson.getRedisNodes(RedisNodes.CLUSTER); + nodes.getMasters().forEach(m -> { + RedisClientConfig cc = new RedisClientConfig(); + cc.setAddress("redis://" + m.getAddr().getHostString() + ":" + m.getAddr().getPort()); + RedisClient c = RedisClient.create(cc); + c.connect().async(RedisCommands.SHUTDOWN); + c.shutdown(); }); - topic.addListener(String.class, (channel, msg) -> messages.add(msg)); - } - RedisRunner.RedisProcess master = process.getNodes().stream().filter(x -> x.getRedisServerPort() == master1.getPort()).findFirst().get(); - master.stop(); + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(30)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } - Thread.sleep(TimeUnit.SECONDS.toMillis(30)); + assertThat(subscriptions).hasSize(topicsAmount*2); - assertThat(subscriptions).hasSize(140); + for (int i = 0; i < topicsAmount; i++) { + RTopic topic = redisson.getTopic("topic" + i); + topic.publish("topic" + i); + } - for (int i = 0; i < topicsAmount; i++) { - RTopic topic = redisson.getTopic("topic" + i); - topic.publish("topic" + i); - } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + assertThat(messages).hasSize(topicsAmount); + }); + } - Thread.sleep(100); - assertThat(messages).hasSize(topicsAmount); + public void withCluster(Consumer callback) { + GenericContainer redisCluster = new GenericContainer<>("vishnunair/docker-redis-cluster") + .withExposedPorts(6379, 6380, 6381, 6382, 6383, 6384) + .withStartupCheckStrategy(new MinimumDurationRunningStartupCheckStrategy(Duration.ofSeconds(10))); + redisCluster.start(); + Config config = new Config(); + config.useClusterServers() + .setNatMapper(new NatMapper() { + @Override + public RedisURI map(RedisURI uri) { + if (redisCluster.getMappedPort(uri.getPort()) == null) { + return uri; + } + return new RedisURI(uri.getScheme(), redisCluster.getHost(), redisCluster.getMappedPort(uri.getPort())); + } + }) + .addNodeAddress("redis://127.0.0.1:" + redisCluster.getFirstMappedPort()); + + RedissonClient redisson = Redisson.create(config); + callback.accept(redisson); redisson.shutdown(); - process.shutdown(); + redisCluster.stop(); } @Test public void testReattachInClusterMaster() throws Exception { - RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave(); + GenericContainer redisCluster = new GenericContainer<>("vishnunair/docker-redis-cluster") + .withExposedPorts(6379, 6380, 6381, 6382, 6383, 6384) + .withStartupCheckStrategy(new MinimumDurationRunningStartupCheckStrategy(Duration.ofSeconds(10))); + redisCluster.start(); - - ClusterRunner clusterRunner = new ClusterRunner() - .addNode(master1, slave1) - .addNode(master2, slave2) - .addNode(master3, slave3); - ClusterProcesses process = clusterRunner.run(); - Config config = new Config(); config.useClusterServers() .setSubscriptionMode(SubscriptionMode.MASTER) - .setLoadBalancer(new RandomLoadBalancer()) - .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); + .setNatMapper(new NatMapper() { + @Override + public RedisURI map(RedisURI uri) { + if (redisCluster.getMappedPort(uri.getPort()) == null) { + return uri; + } + return new RedisURI(uri.getScheme(), redisCluster.getHost(), redisCluster.getMappedPort(uri.getPort())); + } + }) + .addNodeAddress("redis://127.0.0.1:" + redisCluster.getFirstMappedPort()); RedissonClient redisson = Redisson.create(config); final AtomicBoolean executed = new AtomicBoolean(); @@ -1670,17 +1731,20 @@ public class RedissonTopicTest { }); sendCommands(redisson, "3"); - - process.getNodes().stream().filter(x -> master1.getPort() == x.getRedisServerPort()) - .forEach(x -> { - try { - x.stop(); - Thread.sleep(18000); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - }); + + RedisCluster nodes = redisson.getRedisNodes(RedisNodes.CLUSTER); + nodes.getMasters().forEach(m -> { + RedisClientConfig cc = new RedisClientConfig(); + cc.setAddress("redis://" + m.getAddr().getHostString() + ":" + m.getAddr().getPort()); + RedisClient c = RedisClient.create(cc); + c.connect().async(RedisCommands.SHUTDOWN); + c.shutdown(); + try { + Thread.sleep(18000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); Thread.sleep(25000); @@ -1690,7 +1754,7 @@ public class RedissonTopicTest { assertThat(executed.get()).isTrue(); redisson.shutdown(); - process.shutdown(); + redisCluster.stop(); } @Test diff --git a/redisson/src/test/java/org/redisson/RedissonTransferQueueTest.java b/redisson/src/test/java/org/redisson/RedissonTransferQueueTest.java index 8f08b40df..2d30b3cea 100644 --- a/redisson/src/test/java/org/redisson/RedissonTransferQueueTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTransferQueueTest.java @@ -1,9 +1,11 @@ package org.redisson; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.redisson.api.RTransferQueue; +import java.time.Duration; import java.util.HashSet; import java.util.Iterator; import java.util.Set; @@ -15,7 +17,7 @@ import static org.assertj.core.api.Assertions.assertThat; /** * @author Nikita Koksharov */ -public class RedissonTransferQueueTest extends BaseTest { +public class RedissonTransferQueueTest extends RedisDockerTest { @Test public void testTryTransferWithDelay() throws InterruptedException, ExecutionException { @@ -93,12 +95,11 @@ public class RedissonTransferQueueTest extends BaseTest { assertThat(res2).isFalse(); }, 4, TimeUnit.SECONDS); - long s = System.currentTimeMillis(); - int l = queue1.take(); - takeExecuted.set(true); - - Assertions.assertEquals(3, l); - Assertions.assertTrue(System.currentTimeMillis() - s > 3900); + Awaitility.await().atLeast(Duration.ofMillis(3900)).untilAsserted(() -> { + int l = queue1.take(); + takeExecuted.set(true); + assertThat(l).isEqualTo(3); + }); f.get(); assertThat(queue1.size()).isZero(); assertThat(queue1.peek()).isNull(); diff --git a/redisson/src/test/java/org/redisson/jcache/JCacheTest.java b/redisson/src/test/java/org/redisson/jcache/JCacheTest.java index f3c2ec31b..678f514d5 100644 --- a/redisson/src/test/java/org/redisson/jcache/JCacheTest.java +++ b/redisson/src/test/java/org/redisson/jcache/JCacheTest.java @@ -1,61 +1,67 @@ package org.redisson.jcache; -import static java.util.concurrent.TimeUnit.*; -import static org.assertj.core.api.Assertions.assertThat; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.redisson.RedisRunner.FailedToStartRedisException; +import org.redisson.Redisson; +import org.redisson.api.CacheAsync; +import org.redisson.api.CacheReactive; +import org.redisson.api.CacheRx; +import org.redisson.api.RedissonClient; +import org.redisson.codec.TypedJsonJacksonCodec; +import org.redisson.config.Config; +import org.redisson.jcache.configuration.RedissonConfiguration; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import javax.cache.Cache; +import javax.cache.Caching; +import javax.cache.configuration.*; +import javax.cache.event.*; +import javax.cache.expiry.CreatedExpiryPolicy; +import javax.cache.expiry.Duration; +import javax.cache.integration.CacheLoader; +import javax.cache.integration.CacheLoaderException; import java.io.IOException; import java.io.Serializable; import java.net.URI; import java.net.URISyntaxException; import java.net.URL; import java.time.LocalDateTime; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import javax.cache.Cache; -import javax.cache.Caching; -import javax.cache.configuration.*; -import javax.cache.event.CacheEntryEvent; -import javax.cache.event.CacheEntryExpiredListener; -import javax.cache.event.CacheEntryListenerException; -import javax.cache.event.CacheEntryRemovedListener; -import javax.cache.event.CacheEntryUpdatedListener; -import javax.cache.expiry.CreatedExpiryPolicy; -import javax.cache.expiry.Duration; -import javax.cache.integration.CacheLoader; -import javax.cache.integration.CacheLoaderException; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.assertj.core.api.Assertions.assertThat; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.redisson.BaseTest; -import org.redisson.RedisRunner; -import org.redisson.RedisRunner.FailedToStartRedisException; -import org.redisson.RedisRunner.RedisProcess; -import org.redisson.api.CacheAsync; -import org.redisson.api.CacheReactive; -import org.redisson.api.CacheRx; -import org.redisson.codec.TypedJsonJacksonCodec; -import org.redisson.config.Config; -import org.redisson.jcache.configuration.RedissonConfiguration; +@Testcontainers +public class JCacheTest { -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; + @Container + private static final GenericContainer REDIS = + new GenericContainer<>("redis:7.2") + .withCreateContainerCmdModifier(cmd -> { + cmd.withCmd("redis-server", "--save", "''"); + }) + .withExposedPorts(6379); -public class JCacheTest extends BaseTest { + static { + REDIS.setPortBindings(Arrays.asList("6311:6379")); + } + + @BeforeEach + public void beforeEach() throws IOException, InterruptedException { + org.testcontainers.containers.Container.ExecResult r = REDIS.execInContainer("redis-cli", "flushall"); + assertThat(r.getExitCode()).isEqualTo(0); + } @Test public void testCreatedExpiryPolicy() throws Exception { - RedisProcess runner = new RedisRunner() - .nosave() - .randomDir() - .port(6311) - .run(); - URL configUrl = getClass().getResource("redisson-jcache.yaml"); Config cfg = Config.fromYAML(configUrl); @@ -80,17 +86,10 @@ public class JCacheTest extends BaseTest { assertThat(cache.get("1")).isEqualTo("5"); cache.close(); - runner.stop(); } @Test public void testClear() throws Exception { - RedisProcess runner = new RedisRunner() - .nosave() - .randomDir() - .port(6311) - .run(); - URL configUrl = getClass().getResource("redisson-jcache.yaml"); Config cfg = Config.fromYAML(configUrl); @@ -103,17 +102,10 @@ public class JCacheTest extends BaseTest { assertThat(cache.get("1")).isNull(); cache.close(); - runner.stop(); } @Test public void testAsync() throws Exception { - RedisProcess runner = new RedisRunner() - .nosave() - .randomDir() - .port(6311) - .run(); - URL configUrl = getClass().getResource("redisson-jcache.yaml"); Config cfg = Config.fromYAML(configUrl); @@ -126,17 +118,10 @@ public class JCacheTest extends BaseTest { assertThat(async.getAsync("1").get()).isEqualTo("2"); cache.close(); - runner.stop(); } @Test public void testReactive() throws Exception { - RedisProcess runner = new RedisRunner() - .nosave() - .randomDir() - .port(6311) - .run(); - URL configUrl = getClass().getResource("redisson-jcache.yaml"); Config cfg = Config.fromYAML(configUrl); @@ -149,17 +134,10 @@ public class JCacheTest extends BaseTest { assertThat(reactive.get("1").block()).isEqualTo("2"); cache.close(); - runner.stop(); } @Test public void testRx() throws Exception { - RedisProcess runner = new RedisRunner() - .nosave() - .randomDir() - .port(6311) - .run(); - URL configUrl = getClass().getResource("redisson-jcache.yaml"); Config cfg = Config.fromYAML(configUrl); @@ -172,17 +150,10 @@ public class JCacheTest extends BaseTest { assertThat(rx.get("1").blockingGet()).isEqualTo("2"); cache.close(); - runner.stop(); } @Test public void testPutAll() throws Exception { - RedisProcess runner = new RedisRunner() - .nosave() - .randomDir() - .port(6311) - .run(); - URL configUrl = getClass().getResource("redisson-jcache.yaml"); Config cfg = Config.fromYAML(configUrl); @@ -204,17 +175,10 @@ public class JCacheTest extends BaseTest { } cache.close(); - runner.stop(); } @Test public void testRemoveAll() throws Exception { - RedisProcess runner = new RedisRunner() - .nosave() - .randomDir() - .port(6311) - .run(); - URL configUrl = getClass().getResource("redisson-jcache.yaml"); Config cfg = Config.fromYAML(configUrl); @@ -235,17 +199,10 @@ public class JCacheTest extends BaseTest { assertThat(cache.containsKey("5")).isFalse(); cache.close(); - runner.stop(); } @Test public void testGetAllHighVolume() throws Exception { - RedisProcess runner = new RedisRunner() - .nosave() - .randomDir() - .port(6311) - .run(); - URL configUrl = getClass().getResource("redisson-jcache.yaml"); Config cfg = Config.fromYAML(configUrl); @@ -263,17 +220,10 @@ public class JCacheTest extends BaseTest { assertThat(entries).isEqualTo(m); cache.close(); - runner.stop(); } @Test public void testGetAll() throws Exception { - RedisProcess runner = new RedisRunner() - .nosave() - .randomDir() - .port(6311) - .run(); - URL configUrl = getClass().getResource("redisson-jcache.yaml"); Config cfg = Config.fromYAML(configUrl); @@ -291,17 +241,10 @@ public class JCacheTest extends BaseTest { assertThat(entries).isEqualTo(expected); cache.close(); - runner.stop(); } @Test public void testGetAllCacheLoader() throws Exception { - RedisProcess runner = new RedisRunner() - .nosave() - .randomDir() - .port(6311) - .run(); - URL configUrl = getClass().getResource("redisson-jcache.yaml"); Config cfg = Config.fromYAML(configUrl); @@ -343,17 +286,10 @@ public class JCacheTest extends BaseTest { assertThat(entries).isEqualTo(expected); cache.close(); - runner.stop(); } @Test public void testJson() throws InterruptedException, IllegalArgumentException, URISyntaxException, IOException { - RedisProcess runner = new RedisRunner() - .nosave() - .randomDir() - .port(6311) - .run(); - URL configUrl = getClass().getResource("redisson-jcache.yaml"); Config cfg = Config.fromYAML(configUrl); ObjectMapper objectMapper = new ObjectMapper(); @@ -369,17 +305,10 @@ public class JCacheTest extends BaseTest { Assertions.assertEquals(t, cache.get("1")); cache.close(); - runner.stop(); } @Test public void testRedissonConfig() throws InterruptedException, IllegalArgumentException, IOException { - RedisProcess runner = new RedisRunner() - .nosave() - .randomDir() - .port(6311) - .run(); - URL configUrl = getClass().getResource("redisson-jcache.yaml"); Config cfg = Config.fromYAML(configUrl); @@ -401,17 +330,10 @@ public class JCacheTest extends BaseTest { Assertions.assertNull(cache.get("key")); cache.close(); - runner.stop(); } @Test public void testScriptCache() throws IOException, InterruptedException { - RedisProcess runner = new RedisRunner() - .nosave() - .randomDir() - .port(6311) - .run(); - URL configUrl = getClass().getResource("redisson-jcache.yaml"); Config cfg = Config.fromYAML(configUrl); cfg.setUseScriptCache(true); @@ -424,11 +346,13 @@ public class JCacheTest extends BaseTest { Assertions.assertEquals("2", cache.get("1")); cache.close(); - runner.stop(); } @Test public void testRedissonInstance() throws IllegalArgumentException { + Config cfg = new Config(); + cfg.useSingleServer().setAddress("redis://127.0.0.1:6311"); + RedissonClient redisson = Redisson.create(cfg); Configuration config = RedissonConfiguration.fromInstance(redisson); Cache cache = Caching.getCachingProvider().getCacheManager() .createCache("test", config); @@ -437,16 +361,11 @@ public class JCacheTest extends BaseTest { Assertions.assertEquals("2", cache.get("1")); cache.close(); + redisson.shutdown(); } @Test public void testExpiration() throws InterruptedException, IllegalArgumentException, URISyntaxException, FailedToStartRedisException, IOException { - RedisProcess runner = new RedisRunner() - .nosave() - .randomDir() - .port(6311) - .run(); - MutableConfiguration config = new MutableConfiguration<>(); config.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, 1))); config.setStoreByValue(true); @@ -471,17 +390,10 @@ public class JCacheTest extends BaseTest { Assertions.assertNull(cache.get(key)); cache.close(); - runner.stop(); } @Test public void testUpdate() throws IOException, InterruptedException, URISyntaxException { - RedisProcess runner = new RedisRunner() - .nosave() - .randomDir() - .port(6311) - .run(); - MutableConfiguration config = new MutableConfiguration<>(); config.setStoreByValue(true); @@ -508,17 +420,10 @@ public class JCacheTest extends BaseTest { assertThat(cache.get(key)).isNotNull(); cache.close(); - runner.stop(); } @Test public void testUpdateAsync() throws IOException, InterruptedException, URISyntaxException { - RedisProcess runner = new RedisRunner() - .nosave() - .randomDir() - .port(6311) - .run(); - MutableConfiguration config = new MutableConfiguration<>(); config.setStoreByValue(true); @@ -550,17 +455,10 @@ public class JCacheTest extends BaseTest { assertThat(cache.get(key)).isNotNull(); cache.close(); - runner.stop(); } @Test public void testUpdateWithoutOldValue() throws IOException, InterruptedException, URISyntaxException { - RedisProcess runner = new RedisRunner() - .nosave() - .randomDir() - .port(6311) - .run(); - MutableConfiguration config = new MutableConfiguration<>(); config.setStoreByValue(true); @@ -587,17 +485,10 @@ public class JCacheTest extends BaseTest { assertThat(cache.get(key)).isNotNull(); cache.close(); - runner.stop(); } @Test public void testRemoveListener() throws IOException, InterruptedException, URISyntaxException { - RedisProcess runner = new RedisRunner() - .nosave() - .randomDir() - .port(6311) - .run(); - MutableConfiguration config = new MutableConfiguration<>(); config.setStoreByValue(true); @@ -624,7 +515,6 @@ public class JCacheTest extends BaseTest { assertThat(cache.get(key)).isNull(); cache.close(); - runner.stop(); } public static class ExpiredListener implements CacheEntryExpiredListener, Serializable {