diff --git a/redisson/src/test/java/org/redisson/RedissonShardedTopicTest.java b/redisson/src/test/java/org/redisson/RedissonShardedTopicTest.java index beb539e0e..493d45125 100644 --- a/redisson/src/test/java/org/redisson/RedissonShardedTopicTest.java +++ b/redisson/src/test/java/org/redisson/RedissonShardedTopicTest.java @@ -22,24 +22,17 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; -@Testcontainers -public class RedissonShardedTopicTest { - - @Container - private final GenericContainer redisClusterContainer = - new GenericContainer<>("vishnunair/docker-redis-cluster") - .withExposedPorts(6379, 6380, 6381, 6382, 6383, 6384) - .withStartupCheckStrategy( - new MinimumDurationRunningStartupCheckStrategy(Duration.ofSeconds(6)) - ); +public class RedissonShardedTopicTest extends RedisDockerTest { @Test - public void testInvalidCommand() throws IOException, InterruptedException { - RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner.RedisProcess p = master1.run(); + public void testInvalidCommand() { + GenericContainer redis = createRedis("6.2"); + redis.start(); Config config = new Config(); - config.useSingleServer().setAddress(p.getRedisServerAddressAndPort()); + config.setProtocol(protocol); + config.useSingleServer() + .setAddress("redis://127.0.0.1:" + redis.getFirstMappedPort()); RedissonClient redisson = Redisson.create(config); RShardedTopic t = redisson.getShardedTopic("ttt"); @@ -50,53 +43,35 @@ public class RedissonShardedTopicTest { assertThat(e.getMessage()).contains("ERR unknown command `SSUBSCRIBE`"); redisson.shutdown(); - p.stop(); + redis.stop(); } @Test public void testClusterSharding() { - Config config = new Config(); - - config.useClusterServers() - .setPingConnectionInterval(0) - .setNatMapper(new NatMapper() { - @Override - public RedisURI map(RedisURI uri) { - if (redisClusterContainer.getMappedPort(uri.getPort()) == null) { - return uri; - } - return new RedisURI(uri.getScheme(), redisClusterContainer.getHost(), redisClusterContainer.getMappedPort(uri.getPort())); - } - }) - .setLoadBalancer(new RandomLoadBalancer()) - .addNodeAddress("redis://127.0.0.1:" + redisClusterContainer.getFirstMappedPort()); - RedissonClient redisson = Redisson.create(config); - - AtomicInteger counter = new AtomicInteger(); - for (int i = 0; i < 10; i++) { - int j = i; - RTopic topic = redisson.getShardedTopic("test" + i); - topic.addListener(Integer.class, (c, v) -> { - assertThat(v).isEqualTo(j); - counter.incrementAndGet(); - }); - } - - for (int i = 0; i < 10; i++) { - RTopic topic = redisson.getShardedTopic("test" + i); - long s = topic.publish(i); - assertThat(s).isEqualTo(1); - } - - Awaitility.await().atMost(Duration.ofSeconds(5)).until(() -> counter.get() == 10); - - for (int i = 0; i < 10; i++) { - RTopic topic = redisson.getShardedTopic("test" + i); - topic.removeAllListeners(); - } - - redisson.shutdown(); - + testInCluster(redisson -> { + AtomicInteger counter = new AtomicInteger(); + for (int i = 0; i < 10; i++) { + int j = i; + RTopic topic = redisson.getShardedTopic("test" + i); + topic.addListener(Integer.class, (c, v) -> { + assertThat(v).isEqualTo(j); + counter.incrementAndGet(); + }); + } + + for (int i = 0; i < 10; i++) { + RTopic topic = redisson.getShardedTopic("test" + i); + long s = topic.publish(i); + assertThat(s).isEqualTo(1); + } + + Awaitility.await().atMost(Duration.ofSeconds(5)).until(() -> counter.get() == 10); + + for (int i = 0; i < 10; i++) { + RTopic topic = redisson.getShardedTopic("test" + i); + topic.removeAllListeners(); + } + }); }