refactoring

pull/5644/head
Nikita Koksharov 12 months ago
parent 96eda8422c
commit 26014789c9

@ -22,24 +22,17 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.assertj.core.api.Assertions.assertThat;
@Testcontainers
public class RedissonShardedTopicTest {
@Container
private final GenericContainer<?> redisClusterContainer =
new GenericContainer<>("vishnunair/docker-redis-cluster")
.withExposedPorts(6379, 6380, 6381, 6382, 6383, 6384)
.withStartupCheckStrategy(
new MinimumDurationRunningStartupCheckStrategy(Duration.ofSeconds(6))
);
public class RedissonShardedTopicTest extends RedisDockerTest {
@Test
public void testInvalidCommand() throws IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner.RedisProcess p = master1.run();
public void testInvalidCommand() {
GenericContainer<?> redis = createRedis("6.2");
redis.start();
Config config = new Config();
config.useSingleServer().setAddress(p.getRedisServerAddressAndPort());
config.setProtocol(protocol);
config.useSingleServer()
.setAddress("redis://127.0.0.1:" + redis.getFirstMappedPort());
RedissonClient redisson = Redisson.create(config);
RShardedTopic t = redisson.getShardedTopic("ttt");
@ -50,53 +43,35 @@ public class RedissonShardedTopicTest {
assertThat(e.getMessage()).contains("ERR unknown command `SSUBSCRIBE`");
redisson.shutdown();
p.stop();
redis.stop();
}
@Test
public void testClusterSharding() {
Config config = new Config();
config.useClusterServers()
.setPingConnectionInterval(0)
.setNatMapper(new NatMapper() {
@Override
public RedisURI map(RedisURI uri) {
if (redisClusterContainer.getMappedPort(uri.getPort()) == null) {
return uri;
}
return new RedisURI(uri.getScheme(), redisClusterContainer.getHost(), redisClusterContainer.getMappedPort(uri.getPort()));
}
})
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress("redis://127.0.0.1:" + redisClusterContainer.getFirstMappedPort());
RedissonClient redisson = Redisson.create(config);
AtomicInteger counter = new AtomicInteger();
for (int i = 0; i < 10; i++) {
int j = i;
RTopic topic = redisson.getShardedTopic("test" + i);
topic.addListener(Integer.class, (c, v) -> {
assertThat(v).isEqualTo(j);
counter.incrementAndGet();
});
}
for (int i = 0; i < 10; i++) {
RTopic topic = redisson.getShardedTopic("test" + i);
long s = topic.publish(i);
assertThat(s).isEqualTo(1);
}
Awaitility.await().atMost(Duration.ofSeconds(5)).until(() -> counter.get() == 10);
for (int i = 0; i < 10; i++) {
RTopic topic = redisson.getShardedTopic("test" + i);
topic.removeAllListeners();
}
redisson.shutdown();
testInCluster(redisson -> {
AtomicInteger counter = new AtomicInteger();
for (int i = 0; i < 10; i++) {
int j = i;
RTopic topic = redisson.getShardedTopic("test" + i);
topic.addListener(Integer.class, (c, v) -> {
assertThat(v).isEqualTo(j);
counter.incrementAndGet();
});
}
for (int i = 0; i < 10; i++) {
RTopic topic = redisson.getShardedTopic("test" + i);
long s = topic.publish(i);
assertThat(s).isEqualTo(1);
}
Awaitility.await().atMost(Duration.ofSeconds(5)).until(() -> counter.get() == 10);
for (int i = 0; i < 10; i++) {
RTopic topic = redisson.getShardedTopic("test" + i);
topic.removeAllListeners();
}
});
}

Loading…
Cancel
Save