diff --git a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java index d34788180..60d14b417 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java +++ b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java @@ -190,7 +190,7 @@ public class PublishSubscribeService { return new PubSubPatternStatusListener((PubSubPatternStatusListener) l) { @Override public void onStatus(PubSubType type, CharSequence channel) { - if (statusCounter.decrementAndGet() == 0) { + if (statusCounter.get() == 0 || statusCounter.decrementAndGet() == 0) { super.onStatus(type, channel); } } @@ -224,8 +224,8 @@ public class PublishSubscribeService { public boolean isMultiEntity(ChannelName channelName) { return connectionManager.isClusterMode() - && (channelName.toString().startsWith("__keyspace@") - || channelName.toString().startsWith("__keyevent@")); + && (channelName.toString().startsWith("__keyspace") + || channelName.toString().startsWith("__keyevent")); } public CompletableFuture subscribe(MasterSlaveEntry entry, ClientConnectionsEntry clientEntry, @@ -356,7 +356,7 @@ public class PublishSubscribeService { return new PubSubStatusListener(((PubSubStatusListener) l).getListener(), ((PubSubStatusListener) l).getName()) { @Override public void onStatus(PubSubType type, CharSequence channel) { - if (statusCounter.decrementAndGet() == 0) { + if (statusCounter.get() == 0 || statusCounter.decrementAndGet() == 0) { super.onStatus(type, channel); } } diff --git a/redisson/src/test/java/org/redisson/RedisDockerTest.java b/redisson/src/test/java/org/redisson/RedisDockerTest.java index 0e0aad2b7..b4455e6db 100644 --- a/redisson/src/test/java/org/redisson/RedisDockerTest.java +++ b/redisson/src/test/java/org/redisson/RedisDockerTest.java @@ -1,5 +1,10 @@ package org.redisson; +import com.github.dockerjava.api.command.InspectContainerResponse; +import com.github.dockerjava.api.model.ContainerNetwork; +import com.github.dockerjava.api.model.ExposedPort; +import com.github.dockerjava.api.model.PortBinding; +import com.github.dockerjava.api.model.Ports; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.redisson.api.NatMapper; @@ -7,20 +12,24 @@ import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.redisson.config.Protocol; import org.redisson.misc.RedisURI; +import org.testcontainers.containers.ContainerState; +import org.testcontainers.containers.DockerComposeContainer; import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy; +import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; +import java.io.File; import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; +import java.util.*; +import java.util.function.BiConsumer; import java.util.function.Consumer; public class RedisDockerTest { protected static final String NOTIFY_KEYSPACE_EVENTS = "--notify-keyspace-events"; - private static final GenericContainer REDIS = createRedis(); + protected static final GenericContainer REDIS = createRedis(); protected static final Protocol protocol = Protocol.RESP2; @@ -35,9 +44,16 @@ public class RedisDockerTest { } protected static GenericContainer createRedis(String version) { + return createRedis(version, "--save", ""); + } + + protected static GenericContainer createRedis(String version, String... params) { return new GenericContainer<>("redis:" + version) .withCreateContainerCmdModifier(cmd -> { - cmd.withCmd("redis-server", "--save", "''"); + List args = new ArrayList<>(); + args.add("redis-server"); + args.addAll(Arrays.asList(params)); + cmd.withCmd(args); }) .withExposedPorts(6379); } @@ -65,10 +81,14 @@ public class RedisDockerTest { } protected static Config createConfig() { + return createConfig(REDIS); + } + + protected static Config createConfig(GenericContainer container) { Config config = new Config(); config.setProtocol(protocol); config.useSingleServer() - .setAddress("redis://127.0.0.1:" + REDIS.getFirstMappedPort()); + .setAddress("redis://127.0.0.1:" + container.getFirstMappedPort()); return config; } @@ -77,6 +97,29 @@ public class RedisDockerTest { return Redisson.create(config); } + protected void withRedisParams(Consumer redissonCallback, String... params) { + GenericContainer redis = + new GenericContainer<>("redis:7.2") + .withCreateContainerCmdModifier(cmd -> { + List args = new ArrayList<>(); + args.add("redis-server"); + args.addAll(Arrays.asList(params)); + cmd.withCmd(args); + }) + .withExposedPorts(6379); + redis.start(); + + Config config = new Config(); + config.setProtocol(protocol); + config.useSingleServer().setAddress("redis://127.0.0.1:" + redis.getFirstMappedPort()); + + try { + redissonCallback.accept(config); + } finally { + redis.stop(); + } + } + protected void testWithParams(Consumer redissonCallback, String... params) { GenericContainer redis = new GenericContainer<>("redis:7.2") @@ -100,7 +143,6 @@ public class RedisDockerTest { redisson.shutdown(); redis.stop(); } - } protected void testInCluster(Consumer redissonCallback) { @@ -137,4 +179,200 @@ public class RedisDockerTest { } } + protected void withSentinel(BiConsumer>, Config> callback, int slaves) throws InterruptedException { + Network network = Network.newNetwork(); + + List>> nodes = new ArrayList<>(); + + GenericContainer master = + new GenericContainer<>("bitnami/redis:7.2.4") + .withNetwork(network) + .withEnv("REDIS_REPLICATION_MODE", "master") + .withEnv("ALLOW_EMPTY_PASSWORD", "yes") + .withNetworkAliases("redis") + .withExposedPorts(6379); + master.start(); + assert master.getNetwork() == network; + int masterPort = master.getFirstMappedPort(); + master.withCreateContainerCmdModifier(cmd -> { + cmd.getHostConfig().withPortBindings( + new PortBinding(Ports.Binding.bindPort(Integer.valueOf(masterPort)), + cmd.getExposedPorts()[0])); + }); + nodes.add(master); + + for (int i = 0; i < slaves; i++) { + GenericContainer slave = + new GenericContainer<>("bitnami/redis:7.2.4") + .withNetwork(network) + .withEnv("REDIS_REPLICATION_MODE", "slave") + .withEnv("REDIS_MASTER_HOST", "redis") + .withEnv("ALLOW_EMPTY_PASSWORD", "yes") + .withNetworkAliases("slave" + i) + .withExposedPorts(6379); + slave.start(); + int slavePort = slave.getFirstMappedPort(); + slave.withCreateContainerCmdModifier(cmd -> { + cmd.getHostConfig().withPortBindings( + new PortBinding(Ports.Binding.bindPort(Integer.valueOf(slavePort)), + cmd.getExposedPorts()[0])); + }); + nodes.add(slave); + } + + GenericContainer sentinel1 = + new GenericContainer<>("bitnami/redis-sentinel:7.2.4") + + .withNetwork(network) + .withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000") + .withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000") + .withNetworkAliases("sentinel1") + .withExposedPorts(26379); + sentinel1.start(); + int sentinel1Port = sentinel1.getFirstMappedPort(); + sentinel1.withCreateContainerCmdModifier(cmd -> { + cmd.getHostConfig().withPortBindings( + new PortBinding(Ports.Binding.bindPort(Integer.valueOf(sentinel1Port)), + cmd.getExposedPorts()[0])); + }); + nodes.add(sentinel1); + + GenericContainer sentinel2 = + new GenericContainer<>("bitnami/redis-sentinel:7.2.4") + .withNetwork(network) + .withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000") + .withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000") + .withNetworkAliases("sentinel2") + .withExposedPorts(26379); + sentinel2.start(); + int sentinel2Port = sentinel2.getFirstMappedPort(); + sentinel2.withCreateContainerCmdModifier(cmd -> { + cmd.getHostConfig().withPortBindings( + new PortBinding(Ports.Binding.bindPort(Integer.valueOf(sentinel2Port)), + cmd.getExposedPorts()[0])); + }); + nodes.add(sentinel2); + + GenericContainer sentinel3 = + new GenericContainer<>("bitnami/redis-sentinel:7.2.4") + .withNetwork(network) + .withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000") + .withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000") + .withNetworkAliases("sentinel3") + .withExposedPorts(26379); + sentinel3.start(); + int sentinel3Port = sentinel3.getFirstMappedPort(); + sentinel3.withCreateContainerCmdModifier(cmd -> { + cmd.getHostConfig().withPortBindings( + new PortBinding(Ports.Binding.bindPort(Integer.valueOf(sentinel3Port)), + cmd.getExposedPorts()[0])); + }); + nodes.add(sentinel3); + + Thread.sleep(5000); + + Config config = new Config(); + config.setProtocol(protocol); + config.useSentinelServers() + .setNatMapper(new NatMapper() { + + @Override + public RedisURI map(RedisURI uri) { + for (GenericContainer> node : nodes) { + if (node.getContainerInfo() == null) { + continue; + } + + Ports.Binding[] mappedPort = node.getContainerInfo().getNetworkSettings() + .getPorts().getBindings().get(new ExposedPort(uri.getPort())); + + Map ss = node.getContainerInfo().getNetworkSettings().getNetworks(); + ContainerNetwork s = ss.values().iterator().next(); + + if (uri.getPort() == 6379 && node.getNetworkAliases().contains("slave0")) { + return new RedisURI(uri.getScheme(), "127.0.0.1", Integer.valueOf(mappedPort[0].getHostPortSpec())); + } + + if (mappedPort != null + && s.getIpAddress().equals(uri.getHost())) { + return new RedisURI(uri.getScheme(), "127.0.0.1", Integer.valueOf(mappedPort[0].getHostPortSpec())); + } + } + return uri; + } + }) + .addSentinelAddress("redis://127.0.0.1:" + sentinel1.getFirstMappedPort()) + .setMasterName("mymaster"); + + callback.accept(nodes, config); + + nodes.forEach(n -> n.stop()); + network.close(); + } + + protected void withNewCluster(Consumer callback) { + List nodes = new ArrayList<>(); + + LogMessageWaitStrategy wait2 = new LogMessageWaitStrategy().withRegEx(".*REPLICA\ssync\\:\sFinished\swith\ssuccess.*"); + + DockerComposeContainer environment = + new DockerComposeContainer(new File("src/test/resources/docker-compose.yml")) + .withExposedService("redis-node-0", 6379) + .withExposedService("redis-node-1", 6379) + .withExposedService("redis-node-2", 6379) + .withExposedService("redis-node-3", 6379) + .withExposedService("redis-node-4", 6379) + .withExposedService("redis-node-5", 6379, wait2); + + environment.start(); + + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + for (int i = 0; i < 6; i++) { + Optional cc = environment.getContainerByServiceName("redis-node-" + i); + nodes.add(cc.get().getContainerInfo()); + } + + Optional cc2 = environment.getContainerByServiceName("redis-node-0"); + + Config config = new Config(); + config.useClusterServers() + .setNatMapper(new NatMapper() { + + @Override + public RedisURI map(RedisURI uri) { + for (InspectContainerResponse node : nodes) { + Ports.Binding[] mappedPort = node.getNetworkSettings() + .getPorts().getBindings().get(new ExposedPort(uri.getPort())); + + Map ss = node.getNetworkSettings().getNetworks(); + ContainerNetwork s = ss.values().iterator().next(); + + if (mappedPort != null + && s.getIpAddress().equals(uri.getHost())) { + return new RedisURI(uri.getScheme(), "127.0.0.1", Integer.valueOf(mappedPort[0].getHostPortSpec())); + } + } + return uri; + } + }) + .addNodeAddress("redis://127.0.0.1:" + cc2.get().getFirstMappedPort()); + + RedissonClient redisson = Redisson.create(config); + + callback.accept(redisson); + redisson.shutdown(); + environment.stop(); + } + + protected void restart(GenericContainer redis) { + redis.setPortBindings(Arrays.asList(redis.getFirstMappedPort() + ":6379")); + redis.stop(); + redis.start(); + } + } diff --git a/redisson/src/test/java/org/redisson/RedissonBatchTest.java b/redisson/src/test/java/org/redisson/RedissonBatchTest.java index fdba74cd3..cbdbd91ea 100644 --- a/redisson/src/test/java/org/redisson/RedissonBatchTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBatchTest.java @@ -20,9 +20,6 @@ import org.redisson.codec.JsonJacksonCodec; import org.redisson.command.BatchPromise; import org.redisson.config.Config; import org.redisson.config.SubscriptionMode; -import org.redisson.misc.RedisURI; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy; import java.time.Duration; import java.util.*; @@ -45,100 +42,89 @@ public class RedissonBatchTest extends RedisDockerTest { @ParameterizedTest @MethodSource("data") - public void testSlotMigrationInCluster(BatchOptions batchOptions) throws Exception { - RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slot1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slot2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slot3 = new RedisRunner().randomPort().randomDir().nosave(); - - ClusterRunner clusterRunner = new ClusterRunner() - .addNode(master1, slot1) - .addNode(master2, slot2) - .addNode(master3, slot3); - ClusterRunner.ClusterProcesses process = clusterRunner.run(); - - Config config = new Config(); - config.useClusterServers() - .setScanInterval(1000) - .setSubscriptionMode(SubscriptionMode.MASTER) - .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); - RedissonClient redisson = Redisson.create(config); - - RedisClientConfig cfg = new RedisClientConfig(); - cfg.setAddress(process.getNodes().iterator().next().getRedisServerAddressAndPort()); - RedisClient c = RedisClient.create(cfg); - RedisConnection cc = c.connect(); - List mastersList = cc.sync(RedisCommands.CLUSTER_NODES); - mastersList = mastersList.stream().filter(i -> i.containsFlag(ClusterNodeInfo.Flag.MASTER)).collect(Collectors.toList()); - c.shutdown(); - - ClusterNodeInfo destination = mastersList.stream().filter(i -> i.getSlotRanges().iterator().next().getStartSlot() != 10922).findAny().get(); - ClusterNodeInfo source = mastersList.stream().filter(i -> i.getSlotRanges().iterator().next().getStartSlot() == 10922).findAny().get(); + public void testSlotMigrationInCluster(BatchOptions batchOptions) { + withNewCluster(redissonClient -> { + Config config = redissonClient.getConfig(); + config.useClusterServers() + .setScanInterval(1000) + .setSubscriptionMode(SubscriptionMode.MASTER); + RedissonClient redisson = Redisson.create(config); - RedisClientConfig sourceCfg = new RedisClientConfig(); - sourceCfg.setAddress(source.getAddress()); - RedisClient sourceClient = RedisClient.create(sourceCfg); - RedisConnection sourceConnection = sourceClient.connect(); + RedisClientConfig cfg = new RedisClientConfig(); + cfg.setAddress(config.useClusterServers().getNodeAddresses().get(0)); + RedisClient c = RedisClient.create(cfg); + RedisConnection cc = c.connect(); + List mastersList = cc.sync(RedisCommands.CLUSTER_NODES); + mastersList = mastersList.stream().filter(i -> i.containsFlag(ClusterNodeInfo.Flag.MASTER)).collect(Collectors.toList()); + c.shutdown(); - RedisClientConfig destinationCfg = new RedisClientConfig(); - destinationCfg.setAddress(destination.getAddress()); - RedisClient destinationClient = RedisClient.create(destinationCfg); - RedisConnection destinationConnection = destinationClient.connect(); + ClusterNodeInfo destination = mastersList.stream().filter(i -> i.getSlotRanges().stream().noneMatch(s -> s.hasSlot(10922))).findAny().get(); + ClusterNodeInfo source = mastersList.stream().filter(i -> i.getSlotRanges().stream().anyMatch(s -> s.hasSlot(10922))).findAny().get(); - String lockName = "test{kaO}"; + RedisClientConfig sourceCfg = new RedisClientConfig(); + sourceCfg.setAddress(config.useClusterServers().getNatMapper().map(source.getAddress())); + RedisClient sourceClient = RedisClient.create(sourceCfg); + RedisConnection sourceConnection = sourceClient.connect(); - RBatch batch = redisson.createBatch(batchOptions); - List> futures = new ArrayList<>(); - for (int i = 0; i < 5; i++) { - RFuture f = batch.getMap(lockName).fastPutAsync("" + i, i); - futures.add(f); - } + RedisClientConfig destinationCfg = new RedisClientConfig(); + destinationCfg.setAddress(config.useClusterServers().getNatMapper().map(destination.getAddress())); + RedisClient destinationClient = RedisClient.create(destinationCfg); + RedisConnection destinationConnection = destinationClient.connect(); - destinationConnection.sync(RedisCommands.CLUSTER_SETSLOT, source.getSlotRanges().iterator().next().getStartSlot(), "IMPORTING", source.getNodeId()); - sourceConnection.sync(RedisCommands.CLUSTER_SETSLOT, source.getSlotRanges().iterator().next().getStartSlot(), "MIGRATING", destination.getNodeId()); - - List keys = sourceConnection.sync(RedisCommands.CLUSTER_GETKEYSINSLOT, source.getSlotRanges().iterator().next().getStartSlot(), 100); - List params = new ArrayList(); - params.add(destination.getAddress().getHost()); - params.add(destination.getAddress().getPort()); - params.add(""); - params.add(0); - params.add(2000); - params.add("KEYS"); - params.addAll(keys); - sourceConnection.async(RedisCommands.MIGRATE, params.toArray()); - - for (ClusterNodeInfo node : mastersList) { - RedisClientConfig cc1 = new RedisClientConfig(); - cc1.setAddress(node.getAddress()); - RedisClient ccc = RedisClient.create(cc1); - RedisConnection connection = ccc.connect(); - connection.sync(RedisCommands.CLUSTER_SETSLOT, source.getSlotRanges().iterator().next().getStartSlot(), "NODE", destination.getNodeId()); - ccc.shutdownAsync(); - } + String lockName = "test{kaO}"; - Thread.sleep(2000); + RBatch batch = redisson.createBatch(batchOptions); + List> futures = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + RFuture f = batch.getMap(lockName).fastPutAsync("" + i, i); + futures.add(f); + } - batch.execute(); + destinationConnection.sync(RedisCommands.CLUSTER_SETSLOT, source.getSlotRanges().iterator().next().getStartSlot(), "IMPORTING", source.getNodeId()); + sourceConnection.sync(RedisCommands.CLUSTER_SETSLOT, source.getSlotRanges().iterator().next().getStartSlot(), "MIGRATING", destination.getNodeId()); + + List keys = sourceConnection.sync(RedisCommands.CLUSTER_GETKEYSINSLOT, source.getSlotRanges().iterator().next().getStartSlot(), 100); + List params = new ArrayList(); + params.add(destination.getAddress().getHost()); + params.add(destination.getAddress().getPort()); + params.add(""); + params.add(0); + params.add(2000); + params.add("KEYS"); + params.addAll(keys); + sourceConnection.async(RedisCommands.MIGRATE, params.toArray()); + + for (ClusterNodeInfo node : mastersList) { + RedisClientConfig cc1 = new RedisClientConfig(); + cc1.setAddress(config.useClusterServers().getNatMapper().map(node.getAddress())); + RedisClient ccc = RedisClient.create(cc1); + RedisConnection connection = ccc.connect(); + connection.sync(RedisCommands.CLUSTER_SETSLOT, source.getSlotRanges().iterator().next().getStartSlot(), "NODE", destination.getNodeId()); + ccc.shutdownAsync(); + } - futures.forEach(f -> { try { - f.toCompletableFuture().get(1, TimeUnit.MILLISECONDS); - } catch (TimeoutException e) { - org.junit.jupiter.api.Assertions.fail(e); - } catch (ExecutionException e) { - e.printStackTrace(); + Thread.sleep(2000); } catch (InterruptedException e) { - e.printStackTrace(); + throw new RuntimeException(e); } - }); - sourceClient.shutdown(); - destinationClient.shutdown(); - redisson.shutdown(); - process.shutdown(); + batch.execute(); + + futures.forEach(f -> { + try { + f.toCompletableFuture().get(1, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + org.junit.jupiter.api.Assertions.fail(e); + } catch (Exception e) { + // skip + } + }); + + sourceClient.shutdown(); + destinationClient.shutdown(); + redisson.shutdown(); + }); } @ParameterizedTest diff --git a/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java b/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java index 2f4a7450d..ecfb8f042 100644 --- a/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java @@ -2,17 +2,21 @@ package org.redisson; import org.awaitility.Awaitility; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; -import org.redisson.ClusterRunner.ClusterProcesses; -import org.redisson.RedisRunner.RedisProcess; import org.redisson.api.Entry; import org.redisson.api.RBlockingQueue; import org.redisson.api.RFuture; import org.redisson.api.RedissonClient; +import org.redisson.api.redisnode.RedisCluster; +import org.redisson.api.redisnode.RedisClusterMaster; +import org.redisson.api.redisnode.RedisNodes; +import org.redisson.client.RedisClient; +import org.redisson.client.RedisClientConfig; +import org.redisson.client.protocol.RedisCommands; import org.redisson.config.Config; import org.redisson.connection.balancer.RandomLoadBalancer; +import org.testcontainers.containers.GenericContainer; import java.io.IOException; import java.time.Duration; @@ -42,26 +46,19 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest { } @Test - public void testPollWithBrokenConnection() throws IOException, InterruptedException, ExecutionException { - RedisProcess runner = new RedisRunner() - .nosave() - .randomDir() - .randomPort() - .run(); - - Config config = new Config(); - config.useSingleServer().setAddress(runner.getRedisServerAddressAndPort()); + public void testPollWithBrokenConnection() throws InterruptedException, ExecutionException { + GenericContainer redis = createRedis(); + redis.start(); + + Config config = createConfig(redis); RedissonClient redisson = Redisson.create(config); - final RBlockingQueue queue1 = getQueue(redisson); + RBlockingQueue queue1 = getQueue(redisson); RFuture f = queue1.pollAsync(5, TimeUnit.SECONDS); - try { + Assertions.assertThrows(TimeoutException.class, () -> { f.toCompletableFuture().get(1, TimeUnit.SECONDS); - Assertions.fail(); - } catch (TimeoutException e) { - // skip - } - runner.stop(); + }); + redis.stop(); long start = System.currentTimeMillis(); assertThat(f.get()).isNull(); @@ -79,17 +76,12 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest { } @Test - public void testPollReattach() throws InterruptedException, IOException { - RedisProcess runner = new RedisRunner() - .nosave() - .randomDir() - .randomPort() - .requirepass("1234") - .run(); - - Config config = new Config(); - config.useSingleServer().setAddress(runner.getRedisServerAddressAndPort()) - .setPassword("1234"); + public void testPollReattach() throws InterruptedException { + GenericContainer redis = createRedis("latest","--requirepass", "1234"); + redis.start(); + + Config config = createConfig(redis); + config.useSingleServer().setPassword("1234"); RedissonClient redisson = Redisson.create(config); final AtomicBoolean executed = new AtomicBoolean(); @@ -112,15 +104,9 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest { t.start(); t.join(1000); - runner.stop(); - runner = new RedisRunner() - .port(runner.getRedisServerPort()) - .nosave() - .randomDir() - .requirepass("1234") - .run(); - + restart(redis); + Thread.sleep(1000); RBlockingQueue queue1 = getQueue(redisson); @@ -131,19 +117,15 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest { await().atMost(7, TimeUnit.SECONDS).untilTrue(executed); redisson.shutdown(); - runner.stop(); + redis.stop(); } @Test public void testPollAsyncReattach() throws InterruptedException, IOException, ExecutionException, TimeoutException { - RedisProcess runner = new RedisRunner() - .nosave() - .randomDir() - .randomPort() - .run(); - - Config config = new Config(); - config.useSingleServer().setAddress(runner.getRedisServerAddressAndPort()); + GenericContainer redis = createRedis(); + redis.start(); + + Config config = createConfig(redis); RedissonClient redisson = Redisson.create(config); RBlockingQueue queue1 = getQueue(redisson); @@ -153,13 +135,9 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest { } catch (ExecutionException | TimeoutException e) { // skip } - runner.stop(); - runner = new RedisRunner() - .port(runner.getRedisServerPort()) - .nosave() - .randomDir() - .run(); + restart(redis); + queue1.put(123); // check connection rotation @@ -172,68 +150,63 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest { assertThat(result).isEqualTo(123); redisson.shutdown(); - runner.stop(); + redis.stop(); } @Test - public void testTakeReattachCluster() throws IOException, InterruptedException { - RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave(); - RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave(); - RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave(); - RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave(); - RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave(); - RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave(); - - ClusterRunner clusterRunner = new ClusterRunner() - .addNode(master1, slave1) - .addNode(master2, slave2) - .addNode(master3, slave3); - ClusterProcesses process = clusterRunner.run(); - - Thread.sleep(1000); - - Config config = new Config(); - config.useClusterServers() - .setLoadBalancer(new RandomLoadBalancer()) - .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); - RedissonClient redisson = Redisson.create(config); - - RedisProcess master = process.getNodes().stream().filter(x -> x.getRedisServerPort() == master1.getPort()).findFirst().get(); + public void testTakeReattachCluster() { + withNewCluster(redisson -> { + List> futures = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + RBlockingQueue queue = redisson.getBlockingQueue("queue" + i); + RFuture f = queue.takeAsync(); + futures.add(f); + } - List> futures = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - RBlockingQueue queue = redisson.getBlockingQueue("queue" + i); - RFuture f = queue.takeAsync(); try { - f.toCompletableFuture().get(1, TimeUnit.SECONDS); - } catch (ExecutionException | TimeoutException e) { - // skip + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e) { + throw new RuntimeException(e); } - futures.add(f); - } - master.stop(); + RedisCluster rnc = redisson.getRedisNodes(RedisNodes.CLUSTER); + Optional ff = rnc.getMasters().stream().findFirst(); + RedisClusterMaster master = ff.get(); + RedisClientConfig cc = new RedisClientConfig(); + cc.setAddress("redis://" + master.getAddr().getHostString() + ":" + master.getAddr().getPort()); + RedisClient c = RedisClient.create(cc); + c.connect().async(RedisCommands.SHUTDOWN); + c.shutdown(); - Thread.sleep(TimeUnit.SECONDS.toMillis(80)); + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(30)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } - for (int i = 0; i < 10; i++) { - RBlockingQueue queue = redisson.getBlockingQueue("queue" + i); - queue.put(i*100); - } + for (int i = 0; i < 10; i++) { + RBlockingQueue queue = redisson.getBlockingQueue("queue" + i); + try { + queue.put(i*100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } - for (int i = 0; i < 10; i++) { - RFuture f = futures.get(i); - try { - f.toCompletableFuture().get(20, TimeUnit.SECONDS); - } catch (ExecutionException | TimeoutException e) { - // skip + for (int i = 0; i < 10; i++) { + RFuture f = futures.get(i); + try { + f.toCompletableFuture().get(20, TimeUnit.SECONDS); + } catch (Exception e) { + // skip + } + Integer result = f.toCompletableFuture().getNow(null); + assertThat(result).isEqualTo(i*100); } - Integer result = f.toCompletableFuture().getNow(null); - assertThat(result).isEqualTo(i*100); - } - redisson.shutdown(); - process.shutdown(); + redisson.shutdown(); + + }); } @Test @@ -319,14 +292,10 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest { @Test public void testTakeReattach() throws Exception { - RedisProcess runner = new RedisRunner() - .nosave() - .randomDir() - .randomPort() - .run(); - - Config config = new Config(); - config.useSingleServer().setAddress(runner.getRedisServerAddressAndPort()); + GenericContainer redis = createRedis(); + redis.start(); + + Config config = createConfig(redis); RedissonClient redisson = Redisson.create(config); RBlockingQueue queue1 = getQueue(redisson); @@ -336,13 +305,8 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest { } catch (ExecutionException | TimeoutException e) { e.printStackTrace(); } - runner.stop(); - runner = new RedisRunner() - .port(runner.getRedisServerPort()) - .nosave() - .randomDir() - .run(); + restart(redis); queue1.put(123); // check connection rotation @@ -353,9 +317,9 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest { Integer result = f.get(1, TimeUnit.SECONDS); assertThat(result).isEqualTo(123); - runner.stop(); - + redisson.shutdown(); + redis.stop(); } @Test @@ -444,48 +408,26 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest { } @Test - public void testPollFromAnyInCluster() throws Exception { - RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave(); - RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave(); - RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave(); - RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave(); - RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave(); - RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave(); - - ClusterRunner clusterRunner = new ClusterRunner() - .addNode(master1, slave1) - .addNode(master2, slave2) - .addNode(master3, slave3); - ClusterProcesses process = clusterRunner.run(); - - Thread.sleep(5000); - - Config config = new Config(); - config.useClusterServers() - .setLoadBalancer(new RandomLoadBalancer()) - .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); - RedissonClient redisson = Redisson.create(config); - - RBlockingQueue queue1 = redisson.getBlockingQueue("queue:pollany"); - Executors.newSingleThreadScheduledExecutor().schedule(() -> { - RBlockingQueue queue2 = redisson.getBlockingQueue("queue:pollany1"); - RBlockingQueue queue3 = redisson.getBlockingQueue("queue:pollany2"); - try { - queue3.put(2); - queue1.put(1); - queue2.put(3); - } catch (InterruptedException e) { - Assertions.fail(); - } - }, 3, TimeUnit.SECONDS); + public void testPollFromAnyInCluster() { + testInCluster(redissonClient -> { + RBlockingQueue queue1 = redisson.getBlockingQueue("queue:pollany"); + Executors.newSingleThreadScheduledExecutor().schedule(() -> { + RBlockingQueue queue2 = redisson.getBlockingQueue("queue:pollany1"); + RBlockingQueue queue3 = redisson.getBlockingQueue("queue:pollany2"); + try { + queue3.put(2); + queue1.put(1); + queue2.put(3); + } catch (InterruptedException e) { + Assertions.fail(); + } + }, 3, TimeUnit.SECONDS); - Awaitility.await().between(Duration.ofSeconds(2), Duration.ofSeconds(4)).untilAsserted(() -> { - int value = queue1.pollFromAny(4, TimeUnit.SECONDS, "queue:pollany1", "queue:pollany2"); - assertThat(value).isEqualTo(1); + Awaitility.await().between(Duration.ofSeconds(2), Duration.ofSeconds(4)).untilAsserted(() -> { + int value = queue1.pollFromAny(4, TimeUnit.SECONDS, "queue:pollany1", "queue:pollany2"); + assertThat(value).isEqualTo(1); + }); }); - - redisson.shutdown(); - process.shutdown(); } @Test @@ -557,22 +499,18 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest { @Test public void testPollLastFromAny() throws InterruptedException { - Assumptions.assumeTrue(RedisRunner.getDefaultRedisServerInstance().getRedisVersion().compareTo("7.0.0") > 0); - RBlockingQueue queue1 = redisson.getBlockingQueue("queue:pollany"); RBlockingQueue queue2 = redisson.getBlockingQueue("queue:pollany1"); RBlockingQueue queue3 = redisson.getBlockingQueue("queue:pollany2"); - Assertions.assertDoesNotThrow(() -> { - queue3.put(1); - queue3.put(2); - queue3.put(3); - queue1.put(4); - queue1.put(5); - queue1.put(6); - queue2.put(7); - queue2.put(8); - queue2.put(9); - }); + queue3.put(1); + queue3.put(2); + queue3.put(3); + queue1.put(4); + queue1.put(5); + queue1.put(6); + queue2.put(7); + queue2.put(8); + queue2.put(9); Map> res = queue1.pollLastFromAny(Duration.ofSeconds(4), 2, "queue:pollany1", "queue:pollany2"); assertThat(res.get("queue:pollany")).containsExactly(6, 5); diff --git a/redisson/src/test/java/org/redisson/RedissonLockExpirationRenewalTest.java b/redisson/src/test/java/org/redisson/RedissonLockExpirationRenewalTest.java index cd7f94a9b..25466db1c 100644 --- a/redisson/src/test/java/org/redisson/RedissonLockExpirationRenewalTest.java +++ b/redisson/src/test/java/org/redisson/RedissonLockExpirationRenewalTest.java @@ -1,78 +1,49 @@ package org.redisson; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.redisson.config.Config; - -import java.io.IOException; +import org.testcontainers.containers.GenericContainer; import static org.assertj.core.api.Assertions.assertThatThrownBy; -public class RedissonLockExpirationRenewalTest { +public class RedissonLockExpirationRenewalTest extends RedisDockerTest { private static final String LOCK_KEY = "LOCK_KEY"; public static final long LOCK_WATCHDOG_TIMEOUT = 1_000L; - private RedissonClient redisson; - - @BeforeEach - public void before() throws IOException, InterruptedException { - RedisRunner.startDefaultRedisServerInstance(); - redisson = createInstance(); - } - - @AfterEach - public void after() throws InterruptedException { - redisson.shutdown(); - RedisRunner.shutDownDefaultRedisServerInstance(); - } - @Test - public void testExpirationRenewalIsWorkingAfterTimeout() throws IOException, InterruptedException { - { - RLock lock = redisson.getLock(LOCK_KEY); - lock.lock(); - try { - // force expiration renewal error - restartRedisServer(); - // wait for timeout - Thread.sleep(LOCK_WATCHDOG_TIMEOUT * 2); - } finally { - assertThatThrownBy(lock::unlock).isInstanceOf(IllegalMonitorStateException.class); - } + public void testExpirationRenewalIsWorkingAfterTimeout() throws InterruptedException { + GenericContainer redis = createRedis(); + redis.start(); + + Config c = createConfig(redis); + c.setLockWatchdogTimeout(LOCK_WATCHDOG_TIMEOUT); + RedissonClient redisson = Redisson.create(c); + + RLock lock = redisson.getLock(LOCK_KEY); + lock.lock(); + try { + // force expiration renewal error + restart(redis); + // wait for timeout + Thread.sleep(LOCK_WATCHDOG_TIMEOUT * 2); + } finally { + assertThatThrownBy(lock::unlock).isInstanceOf(IllegalMonitorStateException.class); } - { - RLock lock = redisson.getLock(LOCK_KEY); - lock.lock(); - try { - // wait for timeout - Thread.sleep(LOCK_WATCHDOG_TIMEOUT * 2); - } finally { - lock.unlock(); - } + RLock lock2 = redisson.getLock(LOCK_KEY); + lock2.lock(); + try { + // wait for timeout + Thread.sleep(LOCK_WATCHDOG_TIMEOUT * 2); + } finally { + lock2.unlock(); } - } - - private void restartRedisServer() throws InterruptedException, IOException { - int currentPort = RedisRunner.defaultRedisInstance.getRedisServerPort(); - RedisRunner.shutDownDefaultRedisServerInstance(); - RedisRunner.defaultRedisInstance = new RedisRunner().nosave().randomDir().port(currentPort).run(); - } - public static Config createConfig() { - Config config = new Config(); - config.useSingleServer() - .setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort()); - config.setLockWatchdogTimeout(LOCK_WATCHDOG_TIMEOUT); - return config; + redisson.shutdown(); + redis.stop(); } - public static RedissonClient createInstance() { - Config config = createConfig(); - return Redisson.create(config); - } } diff --git a/redisson/src/test/java/org/redisson/RedissonLockHeavyTest.java b/redisson/src/test/java/org/redisson/RedissonLockHeavyTest.java index 1f0f71dc5..4a0222b60 100644 --- a/redisson/src/test/java/org/redisson/RedissonLockHeavyTest.java +++ b/redisson/src/test/java/org/redisson/RedissonLockHeavyTest.java @@ -14,7 +14,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -public class RedissonLockHeavyTest extends BaseTest { +public class RedissonLockHeavyTest extends RedisDockerTest { public static Collection data() { return Arrays.asList(Arguments.of(2, 5000), diff --git a/redisson/src/test/java/org/redisson/RedissonPriorityBlockingQueueTest.java b/redisson/src/test/java/org/redisson/RedissonPriorityBlockingQueueTest.java index 254face84..c26875866 100644 --- a/redisson/src/test/java/org/redisson/RedissonPriorityBlockingQueueTest.java +++ b/redisson/src/test/java/org/redisson/RedissonPriorityBlockingQueueTest.java @@ -1,20 +1,20 @@ package org.redisson; -import static org.assertj.core.api.Assertions.assertThat; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.redisson.RedisRunner.RedisProcess; import org.redisson.api.RBlockingQueue; import org.redisson.api.RFuture; import org.redisson.api.RedissonClient; import org.redisson.config.Config; +import org.testcontainers.containers.GenericContainer; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.assertj.core.api.Assertions.assertThat; public class RedissonPriorityBlockingQueueTest extends RedissonBlockingQueueTest { @@ -34,15 +34,11 @@ public class RedissonPriorityBlockingQueueTest extends RedissonBlockingQueueTest } @Test - public void testPollAsyncReattach() throws InterruptedException, IOException, ExecutionException, TimeoutException { - RedisProcess runner = new RedisRunner() - .nosave() - .randomDir() - .randomPort() - .run(); - - Config config = new Config(); - config.useSingleServer().setAddress(runner.getRedisServerAddressAndPort()); + public void testPollAsyncReattach() throws InterruptedException, ExecutionException { + GenericContainer redis = createRedis(); + redis.start(); + + Config config = createConfig(redis); RedissonClient redisson = Redisson.create(config); RBlockingQueue queue1 = getQueue(redisson); @@ -52,13 +48,10 @@ public class RedissonPriorityBlockingQueueTest extends RedissonBlockingQueueTest } catch (ExecutionException | TimeoutException e) { // skip } - runner.stop(); + redis.setPortBindings(Arrays.asList(redis.getFirstMappedPort() + ":6379")); + redis.stop(); - runner = new RedisRunner() - .port(runner.getRedisServerPort()) - .nosave() - .randomDir() - .run(); + redis.start(); queue1.put(123); // check connection rotation @@ -71,20 +64,17 @@ public class RedissonPriorityBlockingQueueTest extends RedissonBlockingQueueTest assertThat(result).isEqualTo(123); redisson.shutdown(); - runner.stop(); + redis.stop(); } @Test public void testTakeReattach() throws Exception { - RedisProcess runner = new RedisRunner() - .nosave() - .randomDir() - .randomPort() - .run(); - - Config config = new Config(); - config.useSingleServer().setAddress(runner.getRedisServerAddressAndPort()); + GenericContainer redis = createRedis(); + redis.start(); + + Config config = createConfig(redis); RedissonClient redisson = Redisson.create(config); + RBlockingQueue queue1 = getQueue(redisson); RFuture f = queue1.takeAsync(); try { @@ -92,13 +82,10 @@ public class RedissonPriorityBlockingQueueTest extends RedissonBlockingQueueTest } catch (ExecutionException | TimeoutException e) { // skip } - runner.stop(); + redis.setPortBindings(Arrays.asList(redis.getFirstMappedPort() + ":6379")); + redis.stop(); - runner = new RedisRunner() - .port(runner.getRedisServerPort()) - .nosave() - .randomDir() - .run(); + redis.start(); queue1.put(123); // check connection rotation @@ -109,9 +96,9 @@ public class RedissonPriorityBlockingQueueTest extends RedissonBlockingQueueTest Integer result = f.get(); assertThat(result).isEqualTo(123); assertThat(queue1.size()).isEqualTo(10); - runner.stop(); - + redisson.shutdown(); + redis.stop(); } diff --git a/redisson/src/test/java/org/redisson/RedissonQueueTest.java b/redisson/src/test/java/org/redisson/RedissonQueueTest.java index b55b418ec..9da2aed51 100644 --- a/redisson/src/test/java/org/redisson/RedissonQueueTest.java +++ b/redisson/src/test/java/org/redisson/RedissonQueueTest.java @@ -14,7 +14,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.redisson.api.RQueue; -public class RedissonQueueTest extends BaseTest { +public class RedissonQueueTest extends RedisDockerTest { RQueue getQueue() { return redisson.getQueue("queue"); diff --git a/redisson/src/test/java/org/redisson/RedissonRedisNodesTest.java b/redisson/src/test/java/org/redisson/RedissonRedisNodesTest.java index 3ff4e5d79..cc2b0508a 100644 --- a/redisson/src/test/java/org/redisson/RedissonRedisNodesTest.java +++ b/redisson/src/test/java/org/redisson/RedissonRedisNodesTest.java @@ -1,5 +1,6 @@ package org.redisson; +import com.github.dockerjava.api.model.ContainerNetwork; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.redisson.api.RedissonClient; @@ -7,12 +8,8 @@ import org.redisson.api.redisnode.RedisNodes; import org.redisson.api.redisnode.*; import org.redisson.client.protocol.Time; import org.redisson.cluster.ClusterSlotRange; -import org.redisson.config.Config; -import org.redisson.connection.balancer.RandomLoadBalancer; import org.redisson.misc.RedisURI; -import java.io.IOException; -import java.net.InetSocketAddress; import java.util.List; import java.util.Map; import java.util.Set; @@ -24,7 +21,7 @@ import static org.assertj.core.api.Assertions.assertThat; * @author Nikita Koksharov * */ -public class RedissonRedisNodesTest extends BaseTest { +public class RedissonRedisNodesTest extends RedisDockerTest { @Test public void testNode() { @@ -100,246 +97,110 @@ public class RedissonRedisNodesTest extends BaseTest { } @Test - public void testSentinelFailover() throws IOException, InterruptedException { - RedisRunner.RedisProcess master = new RedisRunner() - .nosave() - .randomDir() - .run(); - RedisRunner.RedisProcess slave1 = new RedisRunner() - .port(6380) - .nosave() - .randomDir() - .slaveof("127.0.0.1", 6379) - .run(); - RedisRunner.RedisProcess slave2 = new RedisRunner() - .port(6381) - .nosave() - .randomDir() - .slaveof("127.0.0.1", 6379) - .run(); - RedisRunner.RedisProcess sentinel1 = new RedisRunner() - .nosave() - .randomDir() - .port(26379) - .sentinel() - .sentinelMonitor("myMaster", "127.0.0.1", 6379, 2) - .run(); - RedisRunner.RedisProcess sentinel2 = new RedisRunner() - .nosave() - .randomDir() - .port(26380) - .sentinel() - .sentinelMonitor("myMaster", "127.0.0.1", 6379, 2) - .run(); - RedisRunner.RedisProcess sentinel3 = new RedisRunner() - .nosave() - .randomDir() - .port(26381) - .sentinel() - .sentinelMonitor("myMaster", "127.0.0.1", 6379, 2) - .run(); - - Config config = new Config(); - config.useSentinelServers() - .setLoadBalancer(new RandomLoadBalancer()) - .addSentinelAddress(sentinel3.getRedisServerAddressAndPort()).setMasterName("myMaster"); - - long t = System.currentTimeMillis(); - RedissonClient redisson = Redisson.create(config); - - RedisSentinelMasterSlave nodes = redisson.getRedisNodes(RedisNodes.SENTINEL_MASTER_SLAVE); - RedisSentinel sentinel = nodes.getSentinels().iterator().next(); - sentinel.failover("myMaster"); - - redisson.shutdown(); - - sentinel1.stop(); - sentinel2.stop(); - sentinel3.stop(); - master.stop(); - slave1.stop(); - slave2.stop(); + public void testSentinelFailover() throws InterruptedException { + withSentinel((nns, config) -> { + RedissonClient redisson = Redisson.create(config); + RedisSentinelMasterSlave nodes = redisson.getRedisNodes(RedisNodes.SENTINEL_MASTER_SLAVE); + RedisSentinel sentinel = nodes.getSentinels().iterator().next(); + sentinel.failover(config.useSentinelServers().getMasterName()); + + redisson.shutdown(); + }, 2); } @Test - public void testCluster() throws Exception { - RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave(); - RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave(); - RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave(); - RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave(); - RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave(); - RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave(); - RedisRunner slave4 = new RedisRunner().port(6903).randomDir().nosave(); - - ClusterRunner clusterRunner = new ClusterRunner() - .addNode(master1, slave1, slave4) - .addNode(master2, slave2) - .addNode(master3, slave3); - ClusterRunner.ClusterProcesses process = clusterRunner.run(); - - Thread.sleep(5000); - - Config config = new Config(); - config.useClusterServers() - .setLoadBalancer(new RandomLoadBalancer()) - .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); - RedissonClient redisson = Redisson.create(config); - - RedisCluster nodes = redisson.getRedisNodes(RedisNodes.CLUSTER); - assertThat(nodes.getMaster("redis://127.0.0.1:6890")).isNotNull(); - assertThat(nodes.getMaster("redis://127.0.0.1:6899")).isNull(); - assertThat(nodes.getMasters()).hasSize(3); - assertThat(nodes.getSlaves()).hasSize(4); - - for (RedisClusterMaster master : nodes.getMasters()) { - master.clusterDeleteSlots(1, 2); - master.clusterAddSlots(1, 2); - master.clusterCountKeysInSlot(1); - List keys = master.clusterGetKeysInSlot(1, 10); - assertThat(keys).isEmpty();; - String nodeId = master.clusterId(); - assertThat(nodeId).isNotNull(); - - assertThat(master.clusterCountFailureReports(nodeId)).isZero(); - Map> slots = master.clusterSlots(); - assertThat(slots.entrySet().size()).isBetween(3, 5); - } - for (RedisClusterSlave slave : nodes.getSlaves()) { - slave.clusterDeleteSlots(1, 2); - slave.clusterAddSlots(1, 2); - slave.clusterCountKeysInSlot(1); - List keys = slave.clusterGetKeysInSlot(1, 10); - assertThat(keys).isEmpty();; - String nodeId = slave.clusterId(); - assertThat(nodeId).isNotNull(); - - assertThat(slave.clusterCountFailureReports(nodeId)).isZero(); - Map> slots = slave.clusterSlots(); - assertThat(slots.entrySet().size()).isBetween(3, 5); - } - redisson.shutdown(); - process.shutdown(); + public void testCluster() { + testInCluster(redisson -> { + RedisCluster nodes = redisson.getRedisNodes(RedisNodes.CLUSTER); + RedisClusterMaster n = nodes.getMasters().iterator().next(); + assertThat(nodes.getMaster("redis://" + n.getAddr().getHostString() + ":" + n.getAddr().getPort())).isNotNull(); + assertThat(nodes.getMaster("redis://127.0.0.1:6899")).isNull(); + assertThat(nodes.getMasters()).hasSize(3); + assertThat(nodes.getSlaves()).hasSize(3); + + for (RedisClusterMaster master : nodes.getMasters()) { + master.clusterDeleteSlots(1, 2); + master.clusterAddSlots(1, 2); + master.clusterCountKeysInSlot(1); + List keys = master.clusterGetKeysInSlot(1, 10); + assertThat(keys).isEmpty();; + String nodeId = master.clusterId(); + assertThat(nodeId).isNotNull(); + + assertThat(master.clusterCountFailureReports(nodeId)).isZero(); + Map> slots = master.clusterSlots(); + assertThat(slots.entrySet().size()).isBetween(3, 5); + } + for (RedisClusterSlave slave : nodes.getSlaves()) { + slave.clusterDeleteSlots(1, 2); + slave.clusterAddSlots(1, 2); + slave.clusterCountKeysInSlot(1); + List keys = slave.clusterGetKeysInSlot(1, 10); + assertThat(keys).isEmpty();; + String nodeId = slave.clusterId(); + assertThat(nodeId).isNotNull(); + + assertThat(slave.clusterCountFailureReports(nodeId)).isZero(); + Map> slots = slave.clusterSlots(); + assertThat(slots.entrySet().size()).isBetween(3, 5); + } + }); } @Test - public void testSentinel() throws IOException, InterruptedException { - RedisRunner.RedisProcess master = new RedisRunner() - .nosave() - .randomDir() - .run(); - RedisRunner.RedisProcess slave1 = new RedisRunner() - .port(6380) - .nosave() - .randomDir() - .slaveof("127.0.0.1", 6379) - .run(); - RedisRunner.RedisProcess slave2 = new RedisRunner() - .port(6381) - .nosave() - .randomDir() - .slaveof("127.0.0.1", 6379) - .run(); - RedisRunner.RedisProcess sentinel1 = new RedisRunner() - .nosave() - .randomDir() - .port(26379) - .sentinel() - .sentinelMonitor("myMaster", "127.0.0.1", 6379, 2) - .run(); - RedisRunner.RedisProcess sentinel2 = new RedisRunner() - .nosave() - .randomDir() - .port(26380) - .sentinel() - .sentinelMonitor("myMaster", "127.0.0.1", 6379, 2) - .run(); - RedisRunner.RedisProcess sentinel3 = new RedisRunner() - .nosave() - .randomDir() - .port(26381) - .sentinel() - .sentinelMonitor("myMaster", "127.0.0.1", 6379, 2) - .run(); - - Config config = new Config(); - config.useSentinelServers() - .setLoadBalancer(new RandomLoadBalancer()) - .addSentinelAddress(sentinel3.getRedisServerAddressAndPort()).setMasterName("myMaster"); - - long t = System.currentTimeMillis(); - RedissonClient redisson = Redisson.create(config); - - RedisSentinelMasterSlave nodes = redisson.getRedisNodes(RedisNodes.SENTINEL_MASTER_SLAVE); - assertThat(nodes.getSentinels()).hasSize(3); - assertThat(nodes.getSlaves()).hasSize(2); - assertThat(nodes.getMaster()).isNotNull(); - - for (RedisSentinel sentinel : nodes.getSentinels()) { - Assertions.assertTrue(sentinel.ping()); - RedisURI addr = sentinel.getMasterAddr("myMaster"); - assertThat(addr.getHost()).isEqualTo("127.0.0.1"); - assertThat(addr.getPort()).isEqualTo(master.getRedisServerPort()); - - Map masterMap = sentinel.getMaster("myMaster"); - assertThat(masterMap).isNotEmpty(); - - List> masters = sentinel.getMasters(); - assertThat(masters).hasSize(1); - Map m = masters.get(0); - assertThat(m.get("ip")).isEqualTo("127.0.0.1"); - assertThat(Integer.valueOf(m.get("port"))).isEqualTo(master.getRedisServerPort()); - - List> slaves = sentinel.getSlaves("myMaster"); - assertThat(slaves).hasSize(2); - } - nodes.getSlaves().forEach((node) -> { - Assertions.assertTrue(node.ping()); - }); - - redisson.shutdown(); - - sentinel1.stop(); - sentinel2.stop(); - sentinel3.stop(); - master.stop(); - slave1.stop(); - slave2.stop(); + public void testSentinel() throws InterruptedException { + withSentinel((nns, config) -> { + RedissonClient redisson = Redisson.create(config); + + RedisSentinelMasterSlave nodes = redisson.getRedisNodes(RedisNodes.SENTINEL_MASTER_SLAVE); + assertThat(nodes.getSentinels()).hasSize(3); + assertThat(nodes.getSlaves()).hasSize(2); + assertThat(nodes.getMaster()).isNotNull(); + + for (RedisSentinel sentinel : nodes.getSentinels()) { + Assertions.assertTrue(sentinel.ping()); + RedisURI addr = sentinel.getMasterAddr(config.useSentinelServers().getMasterName()); + + Map ss = nns.get(0).getContainerInfo().getNetworkSettings().getNetworks(); + ContainerNetwork s = ss.values().iterator().next(); + Integer port = nns.get(0).getExposedPorts().get(0); + + assertThat(addr.getHost()).isEqualTo(s.getIpAddress()); + assertThat(addr.getPort()).isEqualTo(port); + + Map masterMap = sentinel.getMaster(config.useSentinelServers().getMasterName()); + assertThat(masterMap).isNotEmpty(); + + List> masters = sentinel.getMasters(); + assertThat(masters).hasSize(1); + Map m = masters.get(0); + assertThat(m.get("ip")).isEqualTo(s.getIpAddress()); + assertThat(Integer.valueOf(m.get("port"))).isEqualTo(port); + + List> slaves = sentinel.getSlaves(config.useSentinelServers().getMasterName()); + assertThat(slaves).hasSize(2); + } + nodes.getSlaves().forEach((node) -> { + Assertions.assertTrue(node.ping()); + }); + + redisson.shutdown(); + }, 2); } - @Test - public void testNodesInCluster() throws Exception { - RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slot1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slot2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slot3 = new RedisRunner().randomPort().randomDir().nosave(); - - ClusterRunner clusterRunner = new ClusterRunner() - .addNode(master1, slot1) - .addNode(master2, slot2) - .addNode(master3, slot3); - ClusterRunner.ClusterProcesses process = clusterRunner.run(); - - Config config = new Config(); - config.useClusterServers() - .setLoadBalancer(new RandomLoadBalancer()) - .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); - RedissonClient redisson = Redisson.create(config); - - RedisCluster nodes = redisson.getRedisNodes(RedisNodes.CLUSTER); - assertThat(nodes.getMasters()).hasSize(3); - for (RedisClusterMaster node : nodes.getMasters()) { - assertThat(node.info(RedisNode.InfoSection.ALL)).isNotEmpty(); - } - assertThat(nodes.getSlaves()).hasSize(3); - for (RedisClusterSlave node : nodes.getSlaves()) { - assertThat(node.info(RedisNode.InfoSection.ALL)).isNotEmpty(); - } - - redisson.shutdown(); - process.shutdown(); + public void testNodesInCluster() { + testInCluster(redisson -> { + RedisCluster nodes = redisson.getRedisNodes(RedisNodes.CLUSTER); + assertThat(nodes.getMasters()).hasSize(3); + for (RedisClusterMaster node : nodes.getMasters()) { + assertThat(node.info(RedisNode.InfoSection.ALL)).isNotEmpty(); + } + assertThat(nodes.getSlaves()).hasSize(3); + for (RedisClusterSlave node : nodes.getSlaves()) { + assertThat(node.info(RedisNode.InfoSection.ALL)).isNotEmpty(); + } + }); } @Test diff --git a/redisson/src/test/java/org/redisson/RedissonSetTest.java b/redisson/src/test/java/org/redisson/RedissonSetTest.java index fafab23db..e51617d08 100644 --- a/redisson/src/test/java/org/redisson/RedissonSetTest.java +++ b/redisson/src/test/java/org/redisson/RedissonSetTest.java @@ -1,36 +1,22 @@ package org.redisson; -import static org.assertj.core.api.Assertions.assertThat; - -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import org.assertj.core.api.ListAssert; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.redisson.ClusterRunner.ClusterProcesses; import org.redisson.RedisRunner.FailedToStartRedisException; import org.redisson.api.RFuture; import org.redisson.api.RList; import org.redisson.api.RSet; -import org.redisson.api.RedissonClient; import org.redisson.api.SortOrder; import org.redisson.client.codec.IntegerCodec; -import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.StringCodec; -import org.redisson.config.Config; -import org.redisson.connection.balancer.RandomLoadBalancer; + +import java.io.Serializable; +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.assertj.core.api.Assertions.assertThat; public class RedissonSetTest extends RedisDockerTest { diff --git a/redisson/src/test/java/org/redisson/RedissonTest.java b/redisson/src/test/java/org/redisson/RedissonTest.java index 188bc485f..5b2e26e58 100644 --- a/redisson/src/test/java/org/redisson/RedissonTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTest.java @@ -8,8 +8,6 @@ import org.awaitility.Awaitility; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.Test; -import org.redisson.ClusterRunner.ClusterProcesses; -import org.redisson.RedisRunner.RedisProcess; import org.redisson.api.*; import org.redisson.api.redisnode.RedisClusterMaster; import org.redisson.api.redisnode.RedisMaster; @@ -27,11 +25,17 @@ import org.redisson.cluster.ClusterNodeInfo; import org.redisson.cluster.ClusterNodeInfo.Flag; import org.redisson.codec.JsonJacksonCodec; import org.redisson.codec.SerializationCodec; -import org.redisson.config.*; +import org.redisson.config.Config; +import org.redisson.config.ConfigSupport; +import org.redisson.config.Credentials; +import org.redisson.config.CredentialsResolver; import org.redisson.connection.CRC16; import org.redisson.connection.ConnectionListener; import org.redisson.connection.MasterSlaveConnectionManager; import org.redisson.connection.balancer.RandomLoadBalancer; +import org.redisson.misc.RedisURI; +import org.testcontainers.containers.FixedHostPortGenericContainer; +import org.testcontainers.containers.GenericContainer; import java.io.IOException; import java.net.InetSocketAddress; @@ -46,7 +50,7 @@ import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; -public class RedissonTest extends BaseTest { +public class RedissonTest extends RedisDockerTest { @Test public void testVirtualThreads() { @@ -110,7 +114,7 @@ public class RedissonTest extends BaseTest { } @Test - public void testLazyInitialization() throws IOException, InterruptedException { + public void testLazyInitialization() { Config config = new Config(); config.setLazyInitialization(true); config.useSingleServer() @@ -125,16 +129,14 @@ public class RedissonTest extends BaseTest { redisson.getStream("test").createGroup(StreamCreateGroupArgs.name("test").makeStream()); }); - RedisProcess pp = new RedisRunner() - .nosave() - .port(4431) - .randomDir() - .run(); + FixedHostPortGenericContainer c = new FixedHostPortGenericContainer("redis:7.2") + .withFixedExposedPort(4431, 6379); + c.start(); redisson.getStream("test").createGroup(StreamCreateGroupArgs.name("test").makeStream()); redisson.shutdown(); - pp.stop(); + c.stop(); } @Test @@ -152,20 +154,19 @@ public class RedissonTest extends BaseTest { } ex.shutdown(); - assertThat(ex.awaitTermination(8, TimeUnit.SECONDS)).isTrue(); + assertThat(ex.awaitTermination(20, TimeUnit.SECONDS)).isTrue(); inst.shutdown(); } @Test public void testResponseHandling2() throws InterruptedException { - Config config = new Config(); + Config config = createConfig(); config.useSingleServer() .setTimeout(10) .setRetryAttempts(0) .setConnectionPoolSize(1) .setConnectionMinimumIdleSize(1) - .setPingConnectionInterval(0) - .setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort()); + .setPingConnectionInterval(0); RedissonClient redisson = Redisson.create(config); @@ -225,7 +226,7 @@ public class RedissonTest extends BaseTest { }); } e.shutdown(); - assertThat(e.awaitTermination(40, TimeUnit.SECONDS)).isTrue(); + assertThat(e.awaitTermination(70, TimeUnit.SECONDS)).isTrue(); assertThat(counter.get()).isEqualTo(10000 * 100); } @@ -247,11 +248,10 @@ public class RedissonTest extends BaseTest { @Test public void testSmallPool() throws InterruptedException { - Config config = new Config(); + Config config = createConfig(); config.useSingleServer() - .setConnectionMinimumIdleSize(3) - .setConnectionPoolSize(3) - .setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort()); + .setConnectionMinimumIdleSize(3) + .setConnectionPoolSize(3); RedissonClient localRedisson = Redisson.create(config); @@ -283,12 +283,11 @@ public class RedissonTest extends BaseTest { } @Test - public void testNextResponseAfterDecoderError() throws Exception { - Config config = new Config(); + public void testNextResponseAfterDecoderError() { + Config config = createConfig(); config.useSingleServer() .setConnectionMinimumIdleSize(1) - .setConnectionPoolSize(1) - .setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort()); + .setConnectionPoolSize(1); RedissonClient redisson = Redisson.create(config); @@ -299,7 +298,7 @@ public class RedissonTest extends BaseTest { RBuckets buckets = redisson.getBuckets(new JsonJacksonCodec()); buckets.get("test2", "test1"); } catch (Exception e) { - e.printStackTrace(); + // skip } assertThat(getStringValue(redisson, "test3")).isEqualTo("\"test3\""); @@ -324,81 +323,76 @@ public class RedissonTest extends BaseTest { @Test public void testSer() { - Config config = new Config(); - config.useSingleServer().setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort()); + Config config = createConfig(); config.setCodec(new SerializationCodec()); + RedissonClient r = Redisson.create(config); Assertions.assertThrows(IllegalArgumentException.class, () -> { - RedissonClient r = Redisson.create(config); r.getMap("test").put("1", new Dummy()); }); + r.shutdown(); } @Test - public void testMemoryScript() throws IOException, InterruptedException { - RedisProcess p = redisTestSmallMemory(); + public void testMemoryScript() { + testWithParams(redissonClient -> { + Config c = redissonClient.getConfig(); + c.useSingleServer().setTimeout(100000); - Config config = new Config(); - config.useSingleServer().setAddress(p.getRedisServerAddressAndPort()).setTimeout(100000); - - Assertions.assertThrows(RedisOutOfMemoryException.class, () -> { - RedissonClient r = null; - try { - r = Redisson.create(config); - r.getKeys().flushall(); - for (int i = 0; i < 10000; i++) { - r.getMap("test").put("" + i, "" + i); + Assertions.assertThrows(RedisOutOfMemoryException.class, () -> { + RedissonClient r = null; + try { + r = Redisson.create(c); + for (int i = 0; i < 10000; i++) { + r.getMap("test").put("" + i, "" + i); + } + } finally { + r.shutdown(); } - } finally { - r.shutdown(); - p.stop(); - } - }); + }); + }, "--maxmemory", "1mb"); + } @Test - public void testMemoryCommand() throws IOException, InterruptedException { - RedisProcess p = redisTestSmallMemory(); + public void testMemoryCommand() { + testWithParams(redissonClient -> { + Config c = redissonClient.getConfig(); + c.useSingleServer().setTimeout(100000); - Config config = new Config(); - config.useSingleServer().setAddress(p.getRedisServerAddressAndPort()).setTimeout(100000); - - Assertions.assertThrows(RedisOutOfMemoryException.class, () -> { - RedissonClient r = null; - try { - r = Redisson.create(config); - r.getKeys().flushall(); - for (int i = 0; i < 10000; i++) { - r.getMap("test").fastPut("" + i, "" + i); + Assertions.assertThrows(RedisOutOfMemoryException.class, () -> { + RedissonClient r = null; + try { + r = Redisson.create(c); + for (int i = 0; i < 10000; i++) { + r.getMap("test").fastPut("" + i, "" + i); + } + } finally { + r.shutdown(); } - } finally { - r.shutdown(); - p.stop(); - } - }); + }); + }, "--maxmemory", "1mb"); } @Test public void testConfigValidation() { Assertions.assertThrows(IllegalArgumentException.class, () -> { - Config redissonConfig = new Config(); + Config redissonConfig = createConfig(); redissonConfig.useSingleServer() - .setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort()) .setConnectionPoolSize(2); Redisson.create(redissonConfig); }); } @Test - public void testConnectionListener() throws IOException, InterruptedException, TimeoutException { - - final RedisProcess p = redisTestConnection(); + public void testConnectionListener() { + GenericContainer redis = createRedis(); + redis.start(); final AtomicInteger connectCounter = new AtomicInteger(); final AtomicInteger disconnectCounter = new AtomicInteger(); - Config config = new Config(); - config.useSingleServer().setAddress(p.getRedisServerAddressAndPort()); + Config config = createConfig(redis); config.setConnectionListener(new ConnectionListener() { @Override @@ -407,7 +401,7 @@ public class RedissonTest extends BaseTest { @Override public void onDisconnect(InetSocketAddress addr, NodeType nodeType) { - assertThat(addr).isEqualTo(new InetSocketAddress(p.getRedisServerBindAddress(), p.getRedisServerPort())); + assertThat(addr).isEqualTo(new InetSocketAddress(redis.getHost(), redis.getFirstMappedPort())); assertThat(nodeType).isEqualTo(NodeType.MASTER); disconnectCounter.incrementAndGet(); } @@ -418,7 +412,7 @@ public class RedissonTest extends BaseTest { @Override public void onConnect(InetSocketAddress addr, NodeType nodeType) { - assertThat(addr).isEqualTo(new InetSocketAddress(p.getRedisServerBindAddress(), p.getRedisServerPort())); + assertThat(addr).isEqualTo(new InetSocketAddress(redis.getHost(), redis.getFirstMappedPort())); assertThat(nodeType).isEqualTo(NodeType.MASTER); connectCounter.incrementAndGet(); } @@ -427,23 +421,21 @@ public class RedissonTest extends BaseTest { RedissonClient r = Redisson.create(config); r.getBucket("1").get(); - Assertions.assertEquals(0, p.stop()); + redis.setPortBindings(Arrays.asList(redis.getFirstMappedPort() + ":6379")); + redis.stop(); await().atMost(2, TimeUnit.SECONDS).until(() -> disconnectCounter.get() == 1); - + try { r.getBucket("1").get(); } catch (Exception e) { + // skip } - + assertThat(connectCounter.get()).isEqualTo(1); assertThat(disconnectCounter.get()).isEqualTo(1); - RedisProcess pp = new RedisRunner() - .nosave() - .port(p.getRedisServerPort()) - .randomDir() - .run(); + redis.start(); r.getBucket("1").get(); @@ -451,7 +443,7 @@ public class RedissonTest extends BaseTest { assertThat(disconnectCounter.get()).isEqualTo(1); r.shutdown(); - Assertions.assertEquals(0, pp.stop()); + redis.stop(); } public static class SlowCodec extends BaseCodec { @@ -501,28 +493,21 @@ public class RedissonTest extends BaseTest { } @Test - public void testReconnection() throws IOException, InterruptedException, TimeoutException { - RedisProcess runner = new RedisRunner() - .appendonly(true) - .randomDir() - .randomPort() - .run(); - - Config config = new Config(); + public void testReconnection() { + Config config = redisson.getConfig(); config.useSingleServer() .setConnectionMinimumIdleSize(20) .setConnectionPoolSize(20) .setSubscriptionConnectionMinimumIdleSize(20) - .setSubscriptionConnectionPoolSize(20) - .setAddress(runner.getRedisServerAddressAndPort()); + .setSubscriptionConnectionPoolSize(20); RedissonClient r = Redisson.create(config); r.getBucket("myBucket").set(1); assertThat(r.getBucket("myBucket").get()).isEqualTo(1); - - Assertions.assertEquals(0, runner.stop()); - + + REDIS.getDockerClient().pauseContainerCmd(REDIS.getContainerId()).exec(); + AtomicBoolean hasError = new AtomicBoolean(); try { r.getBucket("myBucket").get(); @@ -532,18 +517,12 @@ public class RedissonTest extends BaseTest { } assertThat(hasError.get()).isTrue(); - - RedisProcess pp = new RedisRunner() - .appendonly(true) - .port(runner.getRedisServerPort()) - .dir(runner.getDefaultDir()) - .run(); + + REDIS.getDockerClient().unpauseContainerCmd(REDIS.getContainerId()).exec(); assertThat(r.getBucket("myBucket").get()).isEqualTo(1); r.shutdown(); - - Assertions.assertEquals(0, pp.stop()); } @@ -565,10 +544,7 @@ public class RedissonTest extends BaseTest { @Test public void testShutdown() { - Config config = new Config(); - config.useSingleServer().setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort()); - - RedissonClient r = Redisson.create(config); + RedissonClient r = createInstance(); Assertions.assertFalse(r.isShuttingDown()); Assertions.assertFalse(r.isShutdown()); r.shutdown(); @@ -577,30 +553,22 @@ public class RedissonTest extends BaseTest { } @Test - public void testCredentials() throws IOException, InterruptedException { - RedisProcess runner = new RedisRunner() - .nosave() - .randomDir() - .randomPort() - .requirepass("1234") - .run(); - - Config config = new Config(); - config.useSingleServer() - .setCredentialsResolver(new CredentialsResolver() { - @Override - public CompletionStage resolve(InetSocketAddress address) { - return CompletableFuture.completedFuture(new Credentials(null, "1234")); - } - }) - .setAddress(runner.getRedisServerAddressAndPort()); - RedissonClient redisson = Redisson.create(config); - RBucket b = redisson.getBucket("test"); - b.set("123"); + public void testCredentials() { + withRedisParams(config -> { + config.useSingleServer() + .setCredentialsResolver(new CredentialsResolver() { + @Override + public CompletionStage resolve(InetSocketAddress address) { + return CompletableFuture.completedFuture(new Credentials(null, "1234")); + } + }); - redisson.shutdown(); - runner.stop(); + RedissonClient redisson = Redisson.create(config); + RBucket b = redisson.getBucket("test"); + b.set("123"); + redisson.shutdown(); + }, "--requirepass", "1234"); } @Test @@ -617,29 +585,23 @@ public class RedissonTest extends BaseTest { RedisException e = Assertions.assertThrows(RedisException.class, () -> { b.compareAndSet("test", "v1"); }); - assertThat(e.getMessage()).startsWith("ERR unknown command `EVAL_111`"); + assertThat(e.getMessage()).startsWith("ERR unknown command 'EVAL_111'"); redisson.shutdown(); } @Test - public void testURIPassword() throws InterruptedException, IOException { - RedisProcess runner = new RedisRunner() - .nosave() - .randomDir() - .randomPort() - .requirepass("1234") - .run(); + public void testURIPassword() { + withRedisParams(config -> { + RedisURI ur = new RedisURI(config.useSingleServer().getAddress()); + config.useSingleServer() + .setAddress("redis://:1234@" + ur.getHost() + ":" + ur.getPort()); + RedissonClient redisson = Redisson.create(config); + RBucket b = redisson.getBucket("test"); + b.set("123"); - Config config = new Config(); - config.useSingleServer() - .setAddress("redis://:1234@" + runner.getRedisServerBindAddress() + ":" + runner.getRedisServerPort()); - RedissonClient redisson = Redisson.create(config); - RBucket b = redisson.getBucket("test"); - b.set("123"); - - redisson.shutdown(); - runner.stop(); + redisson.shutdown(); + }, "--requirepass", "1234"); } @Test @@ -716,67 +678,17 @@ public class RedissonTest extends BaseTest { @Test public void testSentinelStartup() throws Exception { - RedisRunner.RedisProcess master = new RedisRunner() - .nosave() - .randomDir() - .run(); - RedisRunner.RedisProcess slave1 = new RedisRunner() - .port(6380) - .nosave() - .randomDir() - .slaveof("127.0.0.1", 6379) - .run(); - RedisRunner.RedisProcess slave2 = new RedisRunner() - .port(6381) - .nosave() - .randomDir() - .slaveof("127.0.0.1", 6379) - .run(); - RedisRunner.RedisProcess sentinel1 = new RedisRunner() - .nosave() - .randomDir() - .port(26379) - .sentinel() - .sentinelMonitor("myMaster", "127.0.0.1", 6379, 2) - .run(); - RedisRunner.RedisProcess sentinel2 = new RedisRunner() - .nosave() - .randomDir() - .port(26380) - .sentinel() - .sentinelMonitor("myMaster", "127.0.0.1", 6379, 2) - .run(); - RedisRunner.RedisProcess sentinel3 = new RedisRunner() - .nosave() - .randomDir() - .port(26381) - .sentinel() - .sentinelMonitor("myMaster", "127.0.0.1", 6379, 2) - .run(); - - Thread.sleep(5000); - - Config config = new Config(); - config.useSentinelServers() - .setLoadBalancer(new RandomLoadBalancer()) - .addSentinelAddress(sentinel3.getRedisServerAddressAndPort()).setMasterName("myMaster"); - - long t = System.currentTimeMillis(); - RedissonClient redisson = Redisson.create(config); - assertThat(System.currentTimeMillis() - t).isLessThan(2000L); - redisson.shutdown(); - - sentinel1.stop(); - sentinel2.stop(); - sentinel3.stop(); - master.stop(); - slave1.stop(); - slave2.stop(); + withSentinel((nodes, config) -> { + long t = System.currentTimeMillis(); + RedissonClient redisson = Redisson.create(config); + assertThat(System.currentTimeMillis() - t).isLessThan(2000L); + redisson.shutdown(); + }, 2); } @Test public void testSingleConfigYAML() throws IOException { - RedissonClient r = BaseTest.createInstance(); + RedissonClient r = createInstance(); String t = r.getConfig().toYAML(); Config c = Config.fromYAML(t); assertThat(c.toYAML()).isEqualTo(t); @@ -802,124 +714,102 @@ public class RedissonTest extends BaseTest { } @Test - public void testEvalCache() throws InterruptedException, IOException { - RedisRunner master1 = new RedisRunner().port(6896).randomDir().nosave(); - RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave(); - RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave(); - RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave(); - RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave(); - RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave(); - - ClusterRunner clusterRunner = new ClusterRunner() - .addNode(master1, slave1) - .addNode(master2, slave2) - .addNode(master3, slave3); - ClusterRunner.ClusterProcesses process = clusterRunner.run(); + public void testEvalCache() { + testInCluster(redissonClient -> { + Config config = redissonClient.getConfig(); + config.setUseScriptCache(true); - Thread.sleep(5000); + RedissonClient redisson = Redisson.create(config); - Config config = new Config(); - config.setUseScriptCache(true); - config.useClusterServers() - .setLoadBalancer(new RandomLoadBalancer()) - .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); - RedissonClient redisson = Redisson.create(config); + RTimeSeries t = redisson.getTimeSeries("test"); + t.add(4, "40"); + t.add(2, "20"); + t.add(1, "10", 1, TimeUnit.SECONDS); - RTimeSeries t = redisson.getTimeSeries("test"); - t.add(4, "40"); - t.add(2, "20"); - t.add(1, "10", 1, TimeUnit.SECONDS); + t.size(); - t.size(); + redisson.shutdown(); + }); } @Test public void testMovedRedirectInCluster() throws Exception { - RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slot1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slot2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slot3 = new RedisRunner().randomPort().randomDir().nosave(); - - ClusterRunner clusterRunner = new ClusterRunner() - .addNode(master1, slot1) - .addNode(master2, slot2) - .addNode(master3, slot3); - ClusterProcesses process = clusterRunner.run(); - - Config config = new Config(); - config.useClusterServers() - .setScanInterval(100000) - .setLoadBalancer(new RandomLoadBalancer()) - .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); - RedissonClient redisson = Redisson.create(config); - - RedisClientConfig cfg = new RedisClientConfig(); - cfg.setAddress(process.getNodes().iterator().next().getRedisServerAddressAndPort()); - RedisClient c = RedisClient.create(cfg); - RedisConnection cc = c.connect(); - List cn = cc.sync(RedisCommands.CLUSTER_NODES); - c.shutdownAsync(); - cn = cn.stream().filter(i -> i.containsFlag(Flag.MASTER)).collect(Collectors.toList()); - Iterator nodesIter = cn.iterator(); - - - ClusterNodeInfo source = nodesIter.next(); - ClusterNodeInfo destination = nodesIter.next(); - - RedisClientConfig sourceCfg = new RedisClientConfig(); - sourceCfg.setAddress(source.getAddress()); - RedisClient sourceClient = RedisClient.create(sourceCfg); - RedisConnection sourceConnection = sourceClient.connect(); - - RedisClientConfig destinationCfg = new RedisClientConfig(); - destinationCfg.setAddress(destination.getAddress()); - RedisClient destinationClient = RedisClient.create(destinationCfg); - RedisConnection destinationConnection = destinationClient.connect(); - - String key = null; - int slot = 0; - for (int i = 0; i < 100000; i++) { - key = "" + i; - slot = CRC16.crc16(key.getBytes()) % MasterSlaveConnectionManager.MAX_SLOT; - if (source.getSlotRanges().iterator().next().getStartSlot() == slot) { - break; + withNewCluster(redissonClient -> { + Config config = redissonClient.getConfig(); + config.useClusterServers() + .setScanInterval(100000); + + RedissonClient redisson = Redisson.create(config); + + Collection ms = redisson.getRedisNodes(RedisNodes.CLUSTER).getMasters(); + RedisClusterMaster m = ms.iterator().next(); + RedisURI a = config.useClusterServers().getNatMapper().map( + new RedisURI("redis://" + m.getAddr().getHostString() + ":" + m.getAddr().getPort())); + RedisClientConfig cfg = new RedisClientConfig(); + cfg.setAddress(a); + RedisClient c = RedisClient.create(cfg); + RedisConnection cc = c.connect(); + List cn = cc.sync(RedisCommands.CLUSTER_NODES); + c.shutdownAsync(); + cn = cn.stream().filter(i -> i.containsFlag(Flag.MASTER)).collect(Collectors.toList()); + Iterator nodesIter = cn.iterator(); + + + ClusterNodeInfo source = nodesIter.next(); + ClusterNodeInfo destination = nodesIter.next(); + + RedisClientConfig sourceCfg = new RedisClientConfig(); + sourceCfg.setAddress(config.useClusterServers().getNatMapper().map(source.getAddress())); + RedisClient sourceClient = RedisClient.create(sourceCfg); + RedisConnection sourceConnection = sourceClient.connect(); + + RedisClientConfig destinationCfg = new RedisClientConfig(); + destinationCfg.setAddress(config.useClusterServers().getNatMapper().map(destination.getAddress())); + RedisClient destinationClient = RedisClient.create(destinationCfg); + RedisConnection destinationConnection = destinationClient.connect(); + + String key = null; + int slot = 0; + for (int i = 0; i < 100000; i++) { + key = "" + i; + slot = CRC16.crc16(key.getBytes()) % MasterSlaveConnectionManager.MAX_SLOT; + if (source.getSlotRanges().iterator().next().getStartSlot() == slot) { + break; + } } - } - - redisson.getBucket(key).set("123"); - destinationConnection.sync(RedisCommands.CLUSTER_SETSLOT, source.getSlotRanges().iterator().next().getStartSlot(), "IMPORTING", source.getNodeId()); - sourceConnection.sync(RedisCommands.CLUSTER_SETSLOT, source.getSlotRanges().iterator().next().getStartSlot(), "MIGRATING", destination.getNodeId()); - - List keys = sourceConnection.sync(RedisCommands.CLUSTER_GETKEYSINSLOT, source.getSlotRanges().iterator().next().getStartSlot(), 100); - List params = new ArrayList(); - params.add(destination.getAddress().getHost()); - params.add(destination.getAddress().getPort()); - params.add(""); - params.add(0); - params.add(2000); - params.add("KEYS"); - params.addAll(keys); - sourceConnection.async(RedisCommands.MIGRATE, params.toArray()); - - for (ClusterNodeInfo node : cn) { - RedisClientConfig cc1 = new RedisClientConfig(); - cc1.setAddress(node.getAddress()); - RedisClient ccc = RedisClient.create(cc1); - RedisConnection connection = ccc.connect(); - connection.sync(RedisCommands.CLUSTER_SETSLOT, slot, "NODE", destination.getNodeId()); - ccc.shutdownAsync(); - } - - redisson.getBucket(key).set("123"); - redisson.getBucket(key).get(); + redisson.getBucket(key).set("123"); + + destinationConnection.sync(RedisCommands.CLUSTER_SETSLOT, source.getSlotRanges().iterator().next().getStartSlot(), "IMPORTING", source.getNodeId()); + sourceConnection.sync(RedisCommands.CLUSTER_SETSLOT, source.getSlotRanges().iterator().next().getStartSlot(), "MIGRATING", destination.getNodeId()); + + List keys = sourceConnection.sync(RedisCommands.CLUSTER_GETKEYSINSLOT, source.getSlotRanges().iterator().next().getStartSlot(), 100); + List params = new ArrayList(); + params.add(destination.getAddress().getHost()); + params.add(destination.getAddress().getPort()); + params.add(""); + params.add(0); + params.add(2000); + params.add("KEYS"); + params.addAll(keys); + sourceConnection.async(RedisCommands.MIGRATE, params.toArray()); + + for (ClusterNodeInfo node : cn) { + RedisClientConfig cc1 = new RedisClientConfig(); + cc1.setAddress(config.useClusterServers().getNatMapper().map(node.getAddress())); + RedisClient ccc = RedisClient.create(cc1); + RedisConnection connection = ccc.connect(); + connection.sync(RedisCommands.CLUSTER_SETSLOT, slot, "NODE", destination.getNodeId()); + ccc.shutdownAsync(); + } - sourceClient.shutdown(); - destinationClient.shutdown(); - redisson.shutdown(); - process.shutdown(); + redisson.getBucket(key).set("123"); + redisson.getBucket(key).get(); + + sourceClient.shutdown(); + destinationClient.shutdown(); + redisson.shutdown(); + }); } @@ -995,31 +885,13 @@ public class RedissonTest extends BaseTest { @Test public void testManyConnections() { - Assumptions.assumeFalse(RedissonRuntimeEnvironment.isTravis); - Config redisConfig = new Config(); + Config redisConfig = createConfig(); redisConfig.useSingleServer() - .setConnectionMinimumIdleSize(5000) - .setConnectionPoolSize(5000) - .setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort()); + .setConnectionMinimumIdleSize(5000) + .setConnectionPoolSize(5000); + RedissonClient r = Redisson.create(redisConfig); r.shutdown(); } - private RedisProcess redisTestSmallMemory() throws IOException, InterruptedException { - return new RedisRunner() - .maxmemory("1mb") - .nosave() - .randomDir() - .randomPort() - .run(); - } - - private RedisProcess redisTestConnection() throws IOException, InterruptedException { - return new RedisRunner() - .nosave() - .randomDir() - .randomPort() - .run(); - } - } diff --git a/redisson/src/test/java/org/redisson/RedissonTopicPatternTest.java b/redisson/src/test/java/org/redisson/RedissonTopicPatternTest.java index da89932c3..885e6c542 100644 --- a/redisson/src/test/java/org/redisson/RedissonTopicPatternTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTopicPatternTest.java @@ -404,6 +404,8 @@ public class RedissonTopicPatternTest extends RedisDockerTest { .addNode(master3, slave3); ClusterRunner.ClusterProcesses process = clusterRunner.run(); + Thread.sleep(7000); + Config config = new Config(); config.useClusterServers() .setSubscriptionMode(subscriptionMode) diff --git a/redisson/src/test/java/org/redisson/RedissonTopicTest.java b/redisson/src/test/java/org/redisson/RedissonTopicTest.java index fc6b637d4..34deb0542 100644 --- a/redisson/src/test/java/org/redisson/RedissonTopicTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTopicTest.java @@ -3,25 +3,23 @@ package org.redisson; import com.github.dockerjava.api.command.InspectContainerResponse; import com.github.dockerjava.api.model.ContainerNetwork; import com.github.dockerjava.api.model.ExposedPort; -import com.github.dockerjava.api.model.PortBinding; import com.github.dockerjava.api.model.Ports; import org.awaitility.Awaitility; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.redisson.ClusterRunner.ClusterProcesses; -import org.redisson.RedisRunner.KEYSPACE_EVENTS_OPTIONS; -import org.redisson.RedisRunner.RedisProcess; import org.redisson.api.*; import org.redisson.api.listener.*; import org.redisson.api.redisnode.RedisCluster; import org.redisson.api.redisnode.RedisClusterMaster; import org.redisson.api.redisnode.RedisClusterSlave; import org.redisson.api.redisnode.RedisNodes; -import org.redisson.client.*; +import org.redisson.client.RedisClient; +import org.redisson.client.RedisClientConfig; +import org.redisson.client.RedisConnection; +import org.redisson.client.RedisException; import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; -import org.redisson.client.protocol.RedisStrictCommand; import org.redisson.cluster.ClusterNodeInfo; import org.redisson.config.Config; import org.redisson.config.SubscriptionMode; @@ -30,11 +28,10 @@ import org.redisson.misc.RedisURI; import org.testcontainers.containers.ContainerState; import org.testcontainers.containers.DockerComposeContainer; import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.Network; import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy; +import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; import java.io.File; -import java.io.IOException; import java.io.Serializable; import java.time.Duration; import java.util.*; @@ -43,7 +40,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -87,7 +83,7 @@ public class RedissonTopicTest extends RedisDockerTest { } @Test - public void testCluster() throws IOException, InterruptedException { + public void testCluster() throws InterruptedException { GenericContainer redisCluster = new GenericContainer<>("vishnunair/docker-redis-cluster") .withExposedPorts(6379, 6380, 6381, 6382, 6383, 6384) .withStartupCheckStrategy(new MinimumDurationRunningStartupCheckStrategy(Duration.ofSeconds(10))); @@ -275,7 +271,7 @@ public class RedissonTopicTest extends RedisDockerTest { } @Test - public void testTopicState() throws InterruptedException { + public void testTopicState() { RTopic stringTopic = redisson.getTopic("test1", StringCodec.INSTANCE); for (int i = 0; i < 3; i++) { AtomicInteger stringMessageReceived = new AtomicInteger(); @@ -305,7 +301,7 @@ public class RedissonTopicTest extends RedisDockerTest { } @Test - public void testMultiTypeConnection() throws InterruptedException { + public void testMultiTypeConnection() { RTopic stringTopic = redisson.getTopic("test1", StringCodec.INSTANCE); AtomicBoolean stringMessageReceived = new AtomicBoolean(); stringTopic.addListener(String.class, new MessageListener() { @@ -585,7 +581,7 @@ public class RedissonTopicTest extends RedisDockerTest { } @Test - public void testRemoveAllListeners2() throws InterruptedException { + public void testRemoveAllListeners2() { RTopic topic1 = redisson.getTopic("topic1"); AtomicInteger counter = new AtomicInteger(); @@ -605,7 +601,7 @@ public class RedissonTopicTest extends RedisDockerTest { } @Test - public void testRemoveByInstance() throws InterruptedException { + public void testRemoveByInstance() { RTopic topic1 = redisson.getTopic("topic1"); MessageListener listener = new MessageListener() { @Override @@ -724,9 +720,7 @@ public class RedissonTopicTest extends RedisDockerTest { @Test public void testReattach() throws Exception { - GenericContainer redis = - new GenericContainer<>("redis:7.2") - .withExposedPorts(6379); + GenericContainer redis = createRedis(); redis.start(); Config config = new Config(); @@ -774,9 +768,7 @@ public class RedissonTopicTest extends RedisDockerTest { @Test public void testAddListenerFailover() throws Exception { - GenericContainer redis = - new GenericContainer<>("redis:7.2") - .withExposedPorts(6379); + GenericContainer redis = createRedis(); redis.start(); Config config = new Config(); @@ -908,148 +900,17 @@ public class RedissonTopicTest extends RedisDockerTest { assertThat(status.peekLast()).isEqualTo("ok"); executor1.shutdown(); + try { + assertThat(executor1.awaitTermination(20, TimeUnit.SECONDS)).isTrue(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } redissonClient.shutdown(); }, 1); } - private void withSentinel(BiConsumer>, Config> callback, int slaves) throws InterruptedException { - Network network = Network.newNetwork(); - - List>> nodes = new ArrayList<>(); - - GenericContainer master = - new GenericContainer<>("bitnami/redis:7.2.4") - .withNetwork(network) - .withEnv("REDIS_REPLICATION_MODE", "master") - .withEnv("ALLOW_EMPTY_PASSWORD", "yes") - .withNetworkAliases("redis") - .withExposedPorts(6379); - master.start(); - assert master.getNetwork() == network; - int masterPort = master.getFirstMappedPort(); - master.withCreateContainerCmdModifier(cmd -> { - cmd.getHostConfig().withPortBindings( - new PortBinding(Ports.Binding.bindPort(Integer.valueOf(masterPort)), - cmd.getExposedPorts()[0])); - }); - nodes.add(master); - - for (int i = 0; i < slaves; i++) { - GenericContainer slave = - new GenericContainer<>("bitnami/redis:7.2.4") - .withNetwork(network) - .withEnv("REDIS_REPLICATION_MODE", "slave") - .withEnv("REDIS_MASTER_HOST", "redis") - .withEnv("ALLOW_EMPTY_PASSWORD", "yes") - .withNetworkAliases("slave" + i) - .withExposedPorts(6379); - slave.start(); - int slavePort = slave.getFirstMappedPort(); - slave.withCreateContainerCmdModifier(cmd -> { - cmd.getHostConfig().withPortBindings( - new PortBinding(Ports.Binding.bindPort(Integer.valueOf(slavePort)), - cmd.getExposedPorts()[0])); - }); - nodes.add(slave); - } - - GenericContainer sentinel1 = - new GenericContainer<>("bitnami/redis-sentinel:7.2.4") - .withNetwork(network) - .withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000") - .withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000") - .withNetworkAliases("sentinel1") - .withExposedPorts(26379); - sentinel1.start(); - int sentinel1Port = sentinel1.getFirstMappedPort(); - sentinel1.withCreateContainerCmdModifier(cmd -> { - cmd.getHostConfig().withPortBindings( - new PortBinding(Ports.Binding.bindPort(Integer.valueOf(sentinel1Port)), - cmd.getExposedPorts()[0])); - }); - nodes.add(sentinel1); - - GenericContainer sentinel2 = - new GenericContainer<>("bitnami/redis-sentinel:7.2.4") - .withNetwork(network) - .withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000") - .withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000") - .withNetworkAliases("sentinel2") - .withExposedPorts(26379); - sentinel2.start(); - int sentinel2Port = sentinel2.getFirstMappedPort(); - sentinel2.withCreateContainerCmdModifier(cmd -> { - cmd.getHostConfig().withPortBindings( - new PortBinding(Ports.Binding.bindPort(Integer.valueOf(sentinel2Port)), - cmd.getExposedPorts()[0])); - }); - nodes.add(sentinel2); - - GenericContainer sentinel3 = - new GenericContainer<>("bitnami/redis-sentinel:7.2.4") - .withNetwork(network) - .withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000") - .withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000") - .withNetworkAliases("sentinel3") - .withExposedPorts(26379); - sentinel3.start(); - int sentinel3Port = sentinel3.getFirstMappedPort(); - sentinel3.withCreateContainerCmdModifier(cmd -> { - cmd.getHostConfig().withPortBindings( - new PortBinding(Ports.Binding.bindPort(Integer.valueOf(sentinel3Port)), - cmd.getExposedPorts()[0])); - }); - nodes.add(sentinel3); - - Thread.sleep(5000); - - Config config = new Config(); - config.setProtocol(protocol); - config.useSentinelServers() - .setNatMapper(new NatMapper() { - - @Override - public RedisURI map(RedisURI uri) { - for (GenericContainer> node : nodes) { - if (node.getContainerInfo() == null) { - continue; - } - - Ports.Binding[] mappedPort = node.getContainerInfo().getNetworkSettings() - .getPorts().getBindings().get(new ExposedPort(uri.getPort())); - - Map ss = node.getContainerInfo().getNetworkSettings().getNetworks(); - ContainerNetwork s = ss.values().iterator().next(); - - if (uri.getPort() == 6379 && node.getNetworkAliases().contains("slave0")) { - return new RedisURI(uri.getScheme(), "127.0.0.1", Integer.valueOf(mappedPort[0].getHostPortSpec())); - } - - if ("redis".equals(uri.getHost()) - && node.getNetworkAliases().contains(uri.getHost())) { - return new RedisURI(uri.getScheme(), "127.0.0.1", Integer.valueOf(mappedPort[0].getHostPortSpec())); - } - - if (mappedPort != null - && s.getIpAddress().equals(uri.getHost())) { - return new RedisURI(uri.getScheme(), "127.0.0.1", Integer.valueOf(mappedPort[0].getHostPortSpec())); - } - } - return uri; - } - }) - .addSentinelAddress("redis://127.0.0.1:" + sentinel1.getFirstMappedPort()) - .setMasterName("mymaster"); - - callback.accept(nodes, config); - - nodes.forEach(n -> n.stop()); - network.close(); - } - - @Test public void testReattachInSentinel() throws Exception { withSentinel((nodes, config) -> { @@ -1143,25 +1004,25 @@ public class RedissonTopicTest extends RedisDockerTest { .sentinel() .sentinelMonitor("myMaster", "127.0.0.1", 6440, 2) .run(); - - Thread.sleep(5000); - + + Thread.sleep(5000); + Config config = new Config(); config.useSentinelServers() .setLoadBalancer(new RandomLoadBalancer()) .addSentinelAddress(sentinel3.getRedisServerAddressAndPort()).setMasterName("myMaster"); RedissonClient redisson = Redisson.create(config); - + final AtomicBoolean executed = new AtomicBoolean(); final AtomicInteger subscriptions = new AtomicInteger(); - + RTopic topic = redisson.getTopic("topic"); topic.addListener(new StatusListener() { - + @Override public void onUnsubscribe(String channel) { } - + @Override public void onSubscribe(String channel) { subscriptions.incrementAndGet(); @@ -1173,26 +1034,26 @@ public class RedissonTopicTest extends RedisDockerTest { executed.set(true); } }); - + sendCommands(redisson, "topic"); - + sentinel1.stop(); sentinel2.stop(); sentinel3.stop(); master.stop(); slave1.stop(); slave2.stop(); - + Thread.sleep(TimeUnit.SECONDS.toMillis(20)); - + topic.removeAllListeners(); long t = System.currentTimeMillis(); topic.addListenerAsync(new StatusListener() { - + @Override public void onUnsubscribe(String channel) { } - + @Override public void onSubscribe(String channel) { subscriptions.incrementAndGet(); @@ -1204,9 +1065,9 @@ public class RedissonTopicTest extends RedisDockerTest { executed.set(true); } }); - + Thread.sleep(TimeUnit.SECONDS.toMillis(5)); - + master = new RedisRunner() .port(6390) .nosave() @@ -1245,12 +1106,12 @@ public class RedissonTopicTest extends RedisDockerTest { .sentinel() .sentinelMonitor("myMaster", "127.0.0.1", 6390, 2) .run(); - + redisson.getTopic("topic").publish(1); - + await().atMost(20, TimeUnit.SECONDS).until(() -> subscriptions.get() == 2); assertThat(executed.get()).isTrue(); - + redisson.shutdown(); sentinel1.stop(); sentinel2.stop(); @@ -1259,13 +1120,13 @@ public class RedissonTopicTest extends RedisDockerTest { slave1.stop(); slave2.stop(); } - + protected Thread sendCommands(RedissonClient redisson, String topicName) { Thread t = new Thread() { @Override public void run() { List> futures = new ArrayList>(); - + for (int i = 0; i < 100; i++) { RFuture f1 = redisson.getBucket("i" + i).getAsync(); RFuture f2 = redisson.getBucket("i" + i).setAsync(""); @@ -1274,7 +1135,7 @@ public class RedissonTopicTest extends RedisDockerTest { futures.add(f2); futures.add(f3); } - + for (RFuture rFuture : futures) { try { rFuture.toCompletableFuture().join(); @@ -1289,7 +1150,7 @@ public class RedissonTopicTest extends RedisDockerTest { } @Test - public void testClusterSharding() throws IOException, InterruptedException { + public void testClusterSharding() { testInCluster(redisson -> { AtomicInteger counter = new AtomicInteger(); for (int i = 0; i < 10; i++) { @@ -1454,6 +1315,11 @@ public class RedissonTopicTest extends RedisDockerTest { assertThat(status.peekLast()).isEqualTo("ok"); executor1.shutdown(); + try { + assertThat(executor1.awaitTermination(20, TimeUnit.SECONDS)).isTrue(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } redissonClient.shutdown(); @@ -1461,62 +1327,55 @@ public class RedissonTopicTest extends RedisDockerTest { } @Test - public void testReattachInClusterMaster2() throws Exception { - RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave(); - RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave(); - RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave(); - RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave(); - RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave(); - RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave(); - - ClusterRunner clusterRunner = new ClusterRunner() - .addNode(master1, slave1) - .addNode(master2, slave2) - .addNode(master3, slave3); - ClusterProcesses process = clusterRunner.run(); - - Thread.sleep(7000); - - Config config = new Config(); - config.useClusterServers() - .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); - RedissonClient redisson = Redisson.create(config); + public void testReattachInClusterMaster2() { + withClusterFailover(redisson -> { - Queue messages = new ConcurrentLinkedQueue<>(); - Queue subscriptions = new ConcurrentLinkedQueue<>(); - - int topicsAmount = 100; - for (int i = 0; i < topicsAmount; i++) { - RTopic topic = redisson.getTopic("topic" + i); - int finalI = i; - topic.addListener(new StatusListener() { + Queue messages = new ConcurrentLinkedQueue<>(); + Queue subscriptions = new ConcurrentLinkedQueue<>(); - @Override - public void onUnsubscribe(String channel) { - } + int topicsAmount = 100; + for (int i = 0; i < topicsAmount; i++) { + RTopic topic = redisson.getTopic("topic" + i); + int finalI = i; + topic.addListener(new StatusListener() { - @Override - public void onSubscribe(String channel) { - subscriptions.add("topic" + finalI); - } - }); - topic.addListener(String.class, (channel, msg) -> messages.add(msg)); - } + @Override + public void onUnsubscribe(String channel) { + } - RedisRunner.RedisProcess master = process.getNodes().stream().filter(x -> x.getRedisServerPort() == master1.getPort()).findFirst().get(); - master.stop(); + @Override + public void onSubscribe(String channel) { + subscriptions.add("topic" + finalI); + } + }); + topic.addListener(String.class, (channel, msg) -> messages.add(msg)); + } - Thread.sleep(TimeUnit.SECONDS.toMillis(40)); + RedisCluster rnc = redisson.getRedisNodes(RedisNodes.CLUSTER); + Optional f = rnc.getMasters().stream().findFirst(); + RedisClusterMaster master = f.get(); + RedisClientConfig cc = new RedisClientConfig(); + cc.setAddress("redis://" + master.getAddr().getHostString() + ":" + master.getAddr().getPort()); + RedisClient c = RedisClient.create(cc); + c.connect().async(RedisCommands.SHUTDOWN); + c.shutdown(); - assertThat(subscriptions).hasSize(140); + Awaitility.waitAtMost(Duration.ofSeconds(40)).untilAsserted(() -> { + assertThat(subscriptions).hasSizeGreaterThan(125); + }); - for (int i = 0; i < topicsAmount; i++) { - RTopic topic = redisson.getTopic("topic" + i); - topic.publish("topic" + i); - } + for (int i = 0; i < topicsAmount; i++) { + RTopic topic = redisson.getTopic("topic" + i); + topic.publish("topic" + i); + } - Thread.sleep(100); - assertThat(messages).hasSize(topicsAmount); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + assertThat(messages).hasSize(topicsAmount); + }); } public void withCluster(Consumer callback) { @@ -1545,9 +1404,11 @@ public class RedissonTopicTest extends RedisDockerTest { redisCluster.stop(); } - public void withCluster2(Consumer callback) { + protected void withClusterFailover(Consumer callback) { List nodes = new ArrayList<>(); + LogMessageWaitStrategy wait2 = new LogMessageWaitStrategy().withRegEx(".*REPLICA\ssync\\:\sFinished\swith\ssuccess.*"); + DockerComposeContainer environment = new DockerComposeContainer(new File("src/test/resources/docker-compose.yml")) .withExposedService("redis-node-0", 6379) @@ -1555,19 +1416,17 @@ public class RedissonTopicTest extends RedisDockerTest { .withExposedService("redis-node-2", 6379) .withExposedService("redis-node-3", 6379) .withExposedService("redis-node-4", 6379) - .withExposedService("redis-node-5", 6379) - .withExposedService("redis-node-6", 6379) - .withExposedService("redis-node-7", 6379); + .withExposedService("redis-node-5", 6379, wait2); environment.start(); try { - Thread.sleep(25000); + Thread.sleep(5000); } catch (InterruptedException e) { throw new RuntimeException(e); } - for (int i = 0; i < 8; i++) { + for (int i = 0; i < 6; i++) { Optional cc = environment.getContainerByServiceName("redis-node-" + i); nodes.add(cc.get().getContainerInfo()); } @@ -1599,162 +1458,139 @@ public class RedissonTopicTest extends RedisDockerTest { RedissonClient redisson = Redisson.create(config); - RedisCluster nodes2 = redisson.getRedisNodes(RedisNodes.CLUSTER); - for (RedisClusterSlave slave : nodes2.getSlaves()) { - slave.setConfig("cluster-node-timeout", "1000"); - } - for (RedisClusterMaster master : nodes2.getMasters()) { - master.setConfig("cluster-node-timeout", "1000"); - } - callback.accept(redisson); redisson.shutdown(); environment.stop(); } @Test - public void testReattachInClusterMaster() throws Exception { - RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave(); - + public void testReattachInClusterMaster() { + withClusterFailover(redissonClient -> { + Config cfg = redissonClient.getConfig(); + cfg.useClusterServers().setSubscriptionMode(SubscriptionMode.MASTER); - ClusterRunner clusterRunner = new ClusterRunner() - .addNode(master1, slave1) - .addNode(master2, slave2) - .addNode(master3, slave3); - ClusterProcesses process = clusterRunner.run(); + RedissonClient redisson = Redisson.create(cfg); + final AtomicBoolean executed = new AtomicBoolean(); + final AtomicInteger subscriptions = new AtomicInteger(); - Thread.sleep(5000); + RTopic topic = redisson.getTopic("3"); + topic.addListener(new StatusListener() { - Config config = new Config(); - config.useClusterServers() - .setSubscriptionMode(SubscriptionMode.MASTER) - .setLoadBalancer(new RandomLoadBalancer()) - .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); - RedissonClient redisson = Redisson.create(config); + @Override + public void onUnsubscribe(String channel) { + } - final AtomicBoolean executed = new AtomicBoolean(); - final AtomicInteger subscriptions = new AtomicInteger(); + @Override + public void onSubscribe(String channel) { + subscriptions.incrementAndGet(); + } + }); + topic.addListener(Integer.class, new MessageListener() { + @Override + public void onMessage(CharSequence channel, Integer msg) { + executed.set(true); + } + }); - RTopic topic = redisson.getTopic("3"); - topic.addListener(new StatusListener() { + sendCommands(redisson, "3"); - @Override - public void onUnsubscribe(String channel) { + RedisCluster rnc = redisson.getRedisNodes(RedisNodes.CLUSTER); + for (RedisClusterMaster master : rnc.getMasters()) { + RedisClientConfig cc = new RedisClientConfig(); + cc.setAddress("redis://" + master.getAddr().getHostString() + ":" + master.getAddr().getPort()); + RedisClient c = RedisClient.create(cc); + RedisConnection cn = c.connect(); + List channels = cn.sync(RedisCommands.PUBSUB_CHANNELS); + if (channels.contains("3")) { + cn.async(RedisCommands.SHUTDOWN); + } + c.shutdown(); } - @Override - public void onSubscribe(String channel) { - subscriptions.incrementAndGet(); - } - }); - topic.addListener(Integer.class, new MessageListener() { - @Override - public void onMessage(CharSequence channel, Integer msg) { - executed.set(true); + try { + Thread.sleep(25000); + } catch (InterruptedException e) { + throw new RuntimeException(e); } - }); - sendCommands(redisson, "3"); + redisson.getTopic("3").publish(1); - process.getNodes().stream().filter(x -> master1.getPort() == x.getRedisServerPort()) - .forEach(x -> { - try { - x.stop(); - Thread.sleep(18000); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - }); - - Thread.sleep(25000); - - redisson.getTopic("3").publish(1); - - await().atMost(75, TimeUnit.SECONDS).until(() -> subscriptions.get() == 2); - assertThat(executed.get()).isTrue(); + await().atMost(75, TimeUnit.SECONDS).until(() -> subscriptions.get() == 2); + assertThat(executed.get()).isTrue(); - redisson.shutdown(); - process.shutdown(); + redisson.shutdown(); + }); } @Test - public void testReattachPatternTopicListenersOnClusterFailover() throws Exception { - final KEYSPACE_EVENTS_OPTIONS keyspaceEvents[] = - {KEYSPACE_EVENTS_OPTIONS.K, KEYSPACE_EVENTS_OPTIONS.E, KEYSPACE_EVENTS_OPTIONS.A}; - final RedisRunner master = new RedisRunner().randomPort().randomDir().nosave() - .notifyKeyspaceEvents(keyspaceEvents); - final RedisRunner slave = new RedisRunner().randomPort().randomDir().nosave() - .notifyKeyspaceEvents(keyspaceEvents); - - final ClusterRunner clusterRunner = new ClusterRunner().addNode(master, slave); - final ClusterProcesses process = clusterRunner.run(); - - final Config config = new Config(); - config.useClusterServers().addNodeAddress( - process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); + public void testReattachPatternTopicListenersOnClusterFailover() { + withClusterFailover(redisson -> { + RedisCluster nodes = redisson.getRedisNodes(RedisNodes.CLUSTER); + for (RedisClusterMaster master : nodes.getMasters()) { + master.setConfig("notify-keyspace-events", "K$"); + } + for (RedisClusterSlave slave : nodes.getSlaves()) { + slave.setConfig("notify-keyspace-events", "K$"); + } - final RedissonClient redisson = Redisson.create(config); + AtomicInteger subscriptions = new AtomicInteger(); + AtomicInteger messagesReceived = new AtomicInteger(); - final AtomicInteger subscriptions = new AtomicInteger(); - final AtomicInteger messagesReceived = new AtomicInteger(); + RPatternTopic topic = + redisson.getPatternTopic("__keyspace*__:i*", StringCodec.INSTANCE); + topic.addListener(new PatternStatusListener() { + @Override + public void onPUnsubscribe(String pattern) {} - final RPatternTopic topic = - redisson.getPatternTopic("__keyspace*__:i*", StringCodec.INSTANCE); - topic.addListener(new PatternStatusListener() { - @Override - public void onPUnsubscribe(String pattern) {} + @Override + public void onPSubscribe(String pattern) { + subscriptions.incrementAndGet(); + } + }); + topic.addListener(String.class, + (pattern, channel, msg) -> messagesReceived.incrementAndGet()); + assertThat(subscriptions.get()).isEqualTo(1); - @Override - public void onPSubscribe(String pattern) { - subscriptions.incrementAndGet(); + try { + sendCommands(redisson, "dummy").join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); } - }); - topic.addListener(String.class, - (pattern, channel, msg) -> messagesReceived.incrementAndGet()); - assertThat(subscriptions.get()).isEqualTo(1); - - sendCommands(redisson, "dummy").join(); - await().atMost(30, TimeUnit.SECONDS).until(() -> messagesReceived.get() == 100); - - failover(process, master, slave); + await().atMost(30, TimeUnit.SECONDS).until(() -> { + return messagesReceived.get() == 100; + }); - redisson.getBucket("i100").set(""); - await().atMost(30, TimeUnit.SECONDS).until(() -> subscriptions.get() == 2); - await().atMost(5, TimeUnit.SECONDS).until(() -> messagesReceived.get() == 101); + RedisCluster rnc = redisson.getRedisNodes(RedisNodes.CLUSTER); + for (RedisClusterMaster master : rnc.getMasters()) { + RedisClientConfig cc = new RedisClientConfig(); + cc.setAddress("redis://" + master.getAddr().getHostString() + ":" + master.getAddr().getPort()); + RedisClient c = RedisClient.create(cc); + RedisConnection cn = c.connect(); + try { + Boolean res = cn.sync(RedisCommands.EXISTS, "i99"); + if (res) { + cn.async(RedisCommands.SHUTDOWN); + } + } catch (Exception e) { + // skip + } + c.shutdown(); + } - redisson.shutdown(); - process.shutdown(); - } + await().atMost(30, TimeUnit.SECONDS).until(() -> { + return subscriptions.get() == 2; + }); - private void failover(ClusterProcesses processes, RedisRunner master, RedisRunner slave) - throws InterruptedException { - final RedisClient masterClient = connect(processes, master); - try { - masterClient.connect().sync(new RedisStrictCommand("DEBUG", "SEGFAULT")); - } catch (RedisTimeoutException e) { - // node goes down, so this command times out waiting for the response - } - Thread.sleep(java.time.Duration.ofSeconds(25).toMillis()); + for (RedisClusterMaster master : nodes.getMasters()) { + master.setConfig("notify-keyspace-events", "K$"); + } - final RedisClient slaveClient = connect(processes, slave); - slaveClient.connect().sync(new RedisStrictCommand("CLUSTER", "FAILOVER"), "TAKEOVER"); - Thread.sleep(java.time.Duration.ofSeconds(25).toMillis()); + redisson.getBucket("i99").set(""); + await().atMost(1, TimeUnit.SECONDS).until(() -> { + System.out.println("messagesReceived.get() " + messagesReceived.get()); + return messagesReceived.get() == 101; + }); + }); } - private RedisClient connect(ClusterProcesses processes, RedisRunner runner) { - return RedisClient.create(new RedisClientConfig() - .setAddress(processes.getNodes().stream() - .filter(node -> node.getRedisServerPort() == runner.getPort()) - .findFirst() - .map(RedisProcess::getRedisServerAddressAndPort) - .orElseThrow(() -> new IllegalArgumentException( - "Failed to find node running at port: " + runner.getPort() - + " in cluster processes")))); - } } diff --git a/redisson/src/test/java/org/redisson/connection/balancer/WeightedRoundRobinBalancerTest.java b/redisson/src/test/java/org/redisson/connection/balancer/WeightedRoundRobinBalancerTest.java index e83a4ebdd..7ff3cf472 100644 --- a/redisson/src/test/java/org/redisson/connection/balancer/WeightedRoundRobinBalancerTest.java +++ b/redisson/src/test/java/org/redisson/connection/balancer/WeightedRoundRobinBalancerTest.java @@ -8,6 +8,7 @@ import java.util.Map; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.redisson.RedisDockerTest; import org.redisson.RedisRunner; import org.redisson.RedisRunner.RedisProcess; import org.redisson.Redisson; @@ -15,28 +16,33 @@ import org.redisson.api.RedissonClient; import org.redisson.client.WriteRedisConnectionException; import org.redisson.config.Config; import org.redisson.config.ReadMode; +import org.testcontainers.containers.GenericContainer; -public class WeightedRoundRobinBalancerTest { +public class WeightedRoundRobinBalancerTest extends RedisDockerTest { @Test public void testUseMasterForReadsIfNoConnectionsToSlaves() { - Assertions.assertThrows(WriteRedisConnectionException.class, () -> { - RedisProcess master = null; - RedisProcess slave = null; + GenericContainer master = null; + GenericContainer slave = null; RedissonClient client = null; try { - master = redisTestInstance(); - slave = redisTestInstance(); + master = createRedis(); + master.start(); + slave = createRedis(); + slave.start(); + + String masterurl = "redis://" + master.getHost() + ":" + master.getFirstMappedPort(); + String slaveurl = "redis://" + slave.getHost() + ":" + slave.getFirstMappedPort(); Map weights = new HashMap<>(); - weights.put(master.getRedisServerAddressAndPort(), 1); - weights.put(slave.getRedisServerAddressAndPort(), 2); + weights.put(masterurl, 1); + weights.put(slaveurl, 2); Config config = new Config(); config.useMasterSlaveServers() .setReadMode(ReadMode.SLAVE) - .setMasterAddress(master.getRedisServerAddressAndPort()) - .addSlaveAddress(slave.getRedisServerAddressAndPort()) + .setMasterAddress(masterurl) + .addSlaveAddress(slaveurl) .setLoadBalancer(new WeightedRoundRobinBalancer(weights, 1)); client = Redisson.create(config); @@ -47,7 +53,9 @@ public class WeightedRoundRobinBalancerTest { slave.stop(); RedissonClient clientCopy = client; - assertThat(clientCopy.getBucket("key").get()).isNull(); + Assertions.assertThrows(WriteRedisConnectionException.class, () -> { + clientCopy.getBucket("key").get(); + }); } finally { if (master != null) { master.stop(); @@ -59,14 +67,6 @@ public class WeightedRoundRobinBalancerTest { client.shutdown(); } } - }); } - private RedisProcess redisTestInstance() throws IOException, InterruptedException { - return new RedisRunner() - .nosave() - .randomDir() - .randomPort() - .run(); - } } diff --git a/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java b/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java index 79c69c032..c9f54fc46 100644 --- a/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java +++ b/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java @@ -1,33 +1,28 @@ package org.redisson.executor; -import com.github.dockerjava.api.model.ContainerNetwork; -import com.github.dockerjava.api.model.ExposedPort; -import com.github.dockerjava.api.model.PortBinding; -import com.github.dockerjava.api.model.Ports; import mockit.Invocation; import mockit.Mock; import mockit.MockUp; import org.awaitility.Awaitility; import org.junit.jupiter.api.*; -import org.redisson.*; +import org.redisson.RedisDockerTest; +import org.redisson.Redisson; +import org.redisson.RedissonNode; import org.redisson.api.*; import org.redisson.api.annotation.RInject; import org.redisson.api.executor.TaskFinishedListener; import org.redisson.api.executor.TaskStartedListener; import org.redisson.config.Config; import org.redisson.config.RedissonNodeConfig; -import org.redisson.connection.balancer.RandomLoadBalancer; -import org.redisson.misc.RedisURI; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.Network; import java.io.IOException; import java.io.Serializable; import java.time.Duration; -import java.util.*; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.BiConsumer; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; @@ -37,7 +32,7 @@ public class RedissonExecutorServiceTest extends RedisDockerTest { private static RedissonNode node; @BeforeEach - public void before() throws IOException, InterruptedException { + public void before() { Config config = createConfig(); RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config); nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test", 1)); @@ -674,140 +669,4 @@ public class RedissonExecutorServiceTest extends RedisDockerTest { }); } - private void withSentinel(BiConsumer>, Config> callback, int slaves) throws InterruptedException { - Network network = Network.newNetwork(); - - List>> nodes = new ArrayList<>(); - - GenericContainer master = - new GenericContainer<>("bitnami/redis:7.2.4") - .withNetwork(network) - .withEnv("REDIS_REPLICATION_MODE", "master") - .withEnv("ALLOW_EMPTY_PASSWORD", "yes") - .withNetworkAliases("redis") - .withExposedPorts(6379); - master.start(); - assert master.getNetwork() == network; - int masterPort = master.getFirstMappedPort(); - master.withCreateContainerCmdModifier(cmd -> { - cmd.getHostConfig().withPortBindings( - new PortBinding(Ports.Binding.bindPort(Integer.valueOf(masterPort)), - cmd.getExposedPorts()[0])); - }); - nodes.add(master); - - for (int i = 0; i < slaves; i++) { - GenericContainer slave = - new GenericContainer<>("bitnami/redis:7.2.4") - .withNetwork(network) - .withEnv("REDIS_REPLICATION_MODE", "slave") - .withEnv("REDIS_MASTER_HOST", "redis") - .withEnv("ALLOW_EMPTY_PASSWORD", "yes") - .withNetworkAliases("slave" + i) - .withExposedPorts(6379); - slave.start(); - int slavePort = slave.getFirstMappedPort(); - slave.withCreateContainerCmdModifier(cmd -> { - cmd.getHostConfig().withPortBindings( - new PortBinding(Ports.Binding.bindPort(Integer.valueOf(slavePort)), - cmd.getExposedPorts()[0])); - }); - nodes.add(slave); - } - - GenericContainer sentinel1 = - new GenericContainer<>("bitnami/redis-sentinel:7.2.4") - .withNetwork(network) - .withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000") - .withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000") - .withNetworkAliases("sentinel1") - .withExposedPorts(26379); - sentinel1.start(); - int sentinel1Port = sentinel1.getFirstMappedPort(); - sentinel1.withCreateContainerCmdModifier(cmd -> { - cmd.getHostConfig().withPortBindings( - new PortBinding(Ports.Binding.bindPort(Integer.valueOf(sentinel1Port)), - cmd.getExposedPorts()[0])); - }); - nodes.add(sentinel1); - - GenericContainer sentinel2 = - new GenericContainer<>("bitnami/redis-sentinel:7.2.4") - .withNetwork(network) - .withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000") - .withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000") - .withNetworkAliases("sentinel2") - .withExposedPorts(26379); - sentinel2.start(); - int sentinel2Port = sentinel2.getFirstMappedPort(); - sentinel2.withCreateContainerCmdModifier(cmd -> { - cmd.getHostConfig().withPortBindings( - new PortBinding(Ports.Binding.bindPort(Integer.valueOf(sentinel2Port)), - cmd.getExposedPorts()[0])); - }); - nodes.add(sentinel2); - - GenericContainer sentinel3 = - new GenericContainer<>("bitnami/redis-sentinel:7.2.4") - .withNetwork(network) - .withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000") - .withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000") - .withNetworkAliases("sentinel3") - .withExposedPorts(26379); - sentinel3.start(); - int sentinel3Port = sentinel3.getFirstMappedPort(); - sentinel3.withCreateContainerCmdModifier(cmd -> { - cmd.getHostConfig().withPortBindings( - new PortBinding(Ports.Binding.bindPort(Integer.valueOf(sentinel3Port)), - cmd.getExposedPorts()[0])); - }); - nodes.add(sentinel3); - - Thread.sleep(5000); - - Config config = new Config(); - config.setProtocol(protocol); - config.useSentinelServers() - .setNatMapper(new NatMapper() { - - @Override - public RedisURI map(RedisURI uri) { - for (GenericContainer> node : nodes) { - if (node.getContainerInfo() == null) { - continue; - } - - Ports.Binding[] mappedPort = node.getContainerInfo().getNetworkSettings() - .getPorts().getBindings().get(new ExposedPort(uri.getPort())); - - Map ss = node.getContainerInfo().getNetworkSettings().getNetworks(); - ContainerNetwork s = ss.values().iterator().next(); - - if (uri.getPort() == 6379 && node.getNetworkAliases().contains("slave0")) { - return new RedisURI(uri.getScheme(), "127.0.0.1", Integer.valueOf(mappedPort[0].getHostPortSpec())); - } - - if ("redis".equals(uri.getHost()) - && node.getNetworkAliases().contains(uri.getHost())) { - return new RedisURI(uri.getScheme(), "127.0.0.1", Integer.valueOf(mappedPort[0].getHostPortSpec())); - } - - if (mappedPort != null - && s.getIpAddress().equals(uri.getHost())) { - return new RedisURI(uri.getScheme(), "127.0.0.1", Integer.valueOf(mappedPort[0].getHostPortSpec())); - } - } - return uri; - } - }) - .addSentinelAddress("redis://127.0.0.1:" + sentinel1.getFirstMappedPort()) - .setMasterName("mymaster"); - - callback.accept(nodes, config); - - nodes.forEach(n -> n.stop()); - network.close(); - } - - } diff --git a/redisson/src/test/resources/docker-compose.yml b/redisson/src/test/resources/docker-compose.yml new file mode 100644 index 000000000..59426ce87 --- /dev/null +++ b/redisson/src/test/resources/docker-compose.yml @@ -0,0 +1,100 @@ +# Copyright VMware, Inc. +# SPDX-License-Identifier: APACHE-2.0 + +version: '2' + +networks: + app-tier: + driver: bridge + +services: + redis-node-0: + image: docker.io/bitnami/redis-cluster:7.2.4 + environment: + - 'ALLOW_EMPTY_PASSWORD=yes' + - 'REDIS_NODES=redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5' + ports: + - '6379' + networks: + - app-tier + + redis-node-1: + image: docker.io/bitnami/redis-cluster:7.2.4 + environment: + - 'ALLOW_EMPTY_PASSWORD=yes' + - 'REDIS_NODES=redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5' + ports: + - '6379' + networks: + - app-tier + + redis-node-2: + image: docker.io/bitnami/redis-cluster:7.2.4 + environment: + - 'ALLOW_EMPTY_PASSWORD=yes' + - 'REDIS_NODES=redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5' + ports: + - '6379' + networks: + - app-tier + + redis-node-3: + image: docker.io/bitnami/redis-cluster:7.2.4 + environment: + - 'ALLOW_EMPTY_PASSWORD=yes' + - 'REDIS_NODES=redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5' + ports: + - '6379' + networks: + - app-tier + + redis-node-4: + image: docker.io/bitnami/redis-cluster:7.2.4 + environment: + - 'ALLOW_EMPTY_PASSWORD=yes' + - 'REDIS_NODES=redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5' + ports: + - '6379' + networks: + - app-tier + +# redis-node-5: +# image: docker.io/bitnami/redis-cluster:7.2.4 +# environment: +# - 'ALLOW_EMPTY_PASSWORD=yes' +# - 'REDIS_NODES=redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5' +# ports: +# - '6379' +# networks: +# - app-tier + +# redis-node-6: +# image: docker.io/bitnami/redis-cluster:7.2.4 +# environment: +# - 'ALLOW_EMPTY_PASSWORD=yes' +# - 'REDIS_NODES=redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5' +# ports: +# - '6379' +# networks: +# - app-tier + + redis-node-5: + image: docker.io/bitnami/redis-cluster:7.2.4 + depends_on: + - redis-node-0 + - redis-node-1 + - redis-node-2 + - redis-node-3 + - redis-node-4 +# - redis-node-5 +# - redis-node-6 + environment: + - 'ALLOW_EMPTY_PASSWORD=yes' + - 'REDIS_CLUSTER_REPLICAS=1' + - 'REDIS_NODES=redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5' +# - 'REDIS_NODES=redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5 redis-node-6 redis-node-7' + - 'REDIS_CLUSTER_CREATOR=yes' + ports: + - '6379' + networks: + - app-tier