refactoring

pull/5457/head
Nikita Koksharov 1 year ago
parent 69228326c3
commit 77ddce68a2

@ -1,12 +1,13 @@
package org.redisson;
import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.command.InspectContainerResponse;
import com.github.dockerjava.api.model.ContainerNetwork;
import com.github.dockerjava.api.model.ExposedPort;
import com.github.dockerjava.api.model.PortBinding;
import com.github.dockerjava.api.model.Ports;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner.KEYSPACE_EVENTS_OPTIONS;
import org.redisson.RedisRunner.RedisProcess;
@ -24,14 +25,15 @@ import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.cluster.ClusterNodeInfo;
import org.redisson.config.Config;
import org.redisson.config.SubscriptionMode;
import org.redisson.connection.SequentialDnsAddressResolverFactory;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.redisson.misc.RedisURI;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.ContainerState;
import org.testcontainers.containers.DockerComposeContainer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
@ -1000,9 +1002,6 @@ public class RedissonTopicTest extends RedisDockerTest {
Thread.sleep(5000);
Config config = new Config();
config.setAddressResolverGroupFactory(new SequentialDnsAddressResolverFactory() {
});
config.useSentinelServers()
.setNatMapper(new NatMapper() {
@ -1191,7 +1190,6 @@ public class RedissonTopicTest extends RedisDockerTest {
@Override
public void onSubscribe(String channel) {
System.out.println("onSubscribe " + (System.currentTimeMillis() - t));
subscriptions.incrementAndGet();
}
});
@ -1313,72 +1311,63 @@ public class RedissonTopicTest extends RedisDockerTest {
}
@Test
public void testReattachInClusterSlave() throws Exception {
GenericContainer redisCluster = new GenericContainer<>("vishnunair/docker-redis-cluster")
.withExposedPorts(6379, 6380, 6381, 6382, 6383, 6384)
.withStartupCheckStrategy(new MinimumDurationRunningStartupCheckStrategy(Duration.ofSeconds(10)));
redisCluster.start();
public void testReattachInClusterSlave() {
withCluster(client -> {
Config config = client.getConfig();
config.useClusterServers()
.setSubscriptionMode(SubscriptionMode.SLAVE);
RedissonClient redisson = Redisson.create(config);
Config config = new Config();
config.useClusterServers()
.setNatMapper(new NatMapper() {
@Override
public RedisURI map(RedisURI uri) {
if (redisCluster.getMappedPort(uri.getPort()) == null) {
return uri;
}
return new RedisURI(uri.getScheme(), redisCluster.getHost(), redisCluster.getMappedPort(uri.getPort()));
}
})
.setSubscriptionMode(SubscriptionMode.SLAVE)
.addNodeAddress("redis://127.0.0.1:" + redisCluster.getFirstMappedPort());
RedissonClient redisson = Redisson.create(config);
final AtomicBoolean executed = new AtomicBoolean();
final AtomicInteger subscriptions = new AtomicInteger();
RTopic topic = redisson.getTopic("topic");
topic.addListener(new StatusListener() {
@Override
public void onUnsubscribe(String channel) {
}
@Override
public void onSubscribe(String channel) {
subscriptions.incrementAndGet();
}
});
topic.addListener(Integer.class, new MessageListener<Integer>() {
@Override
public void onMessage(CharSequence channel, Integer msg) {
executed.set(true);
final AtomicBoolean executed = new AtomicBoolean();
final AtomicInteger subscriptions = new AtomicInteger();
RTopic topic = redisson.getTopic("topic");
topic.addListener(new StatusListener() {
@Override
public void onUnsubscribe(String channel) {
}
@Override
public void onSubscribe(String channel) {
subscriptions.incrementAndGet();
}
});
topic.addListener(Integer.class, new MessageListener<Integer>() {
@Override
public void onMessage(CharSequence channel, Integer msg) {
executed.set(true);
}
});
assertThat(topic.countListeners()).isEqualTo(2);
sendCommands(redisson, "topic");
assertThat(subscriptions.get()).isEqualTo(1);
RedisCluster nodes = redisson.getRedisNodes(RedisNodes.CLUSTER);
for (RedisClusterSlave slave : nodes.getSlaves()) {
RedisClientConfig cc = new RedisClientConfig();
cc.setAddress("redis://" + slave.getAddr().getHostString() + ":" + slave.getAddr().getPort());
RedisClient c = RedisClient.create(cc);
c.connect().async(RedisCommands.SHUTDOWN);
c.shutdown();
}
});
assertThat(topic.countListeners()).isEqualTo(2);
sendCommands(redisson, "topic");
RedisCluster nodes = redisson.getRedisNodes(RedisNodes.CLUSTER);
for (RedisClusterSlave slave : nodes.getSlaves()) {
RedisClientConfig cc = new RedisClientConfig();
cc.setAddress("redis://" + slave.getAddr().getHostString() + ":" + slave.getAddr().getPort());
RedisClient c = RedisClient.create(cc);
c.connect().async(RedisCommands.SHUTDOWN);
c.shutdown();
Thread.sleep(18000);
}
await().atMost(25, TimeUnit.SECONDS).until(() -> subscriptions.get() == 2);
Thread.sleep(15000);
executed.set(false);
redisson.getTopic("topic").publish(1);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assertThat(executed.get()).isTrue();
assertThat(topic.countListeners()).isEqualTo(2);
redisson.getTopic("topic").publish(1);
await().atMost(75, TimeUnit.SECONDS).until(() -> subscriptions.get() == 2);
assertThat(topic.countListeners()).isEqualTo(2);
assertThat(executed.get()).isTrue();
redisson.shutdown();
redisCluster.stop();
redisson.shutdown();
});
}
@Test
@ -1464,57 +1453,59 @@ public class RedissonTopicTest extends RedisDockerTest {
@Test
public void testReattachInClusterMaster2() throws Exception {
withCluster(redisson -> {
Queue<String> messages = new ConcurrentLinkedQueue<>();
Queue<String> subscriptions = new ConcurrentLinkedQueue<>();
RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave();
RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave();
RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave();
RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave();
RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave();
RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterProcesses process = clusterRunner.run();
int topicsAmount = 100;
for (int i = 0; i < topicsAmount; i++) {
RTopic topic = redisson.getTopic("topic" + i);
int finalI = i;
topic.addListener(new StatusListener() {
Config config = new Config();
config.useClusterServers()
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
@Override
public void onUnsubscribe(String channel) {
}
Queue<String> messages = new ConcurrentLinkedQueue<>();
Queue<String> subscriptions = new ConcurrentLinkedQueue<>();
@Override
public void onSubscribe(String channel) {
subscriptions.add("topic" + finalI);
}
});
topic.addListener(String.class, (channel, msg) -> messages.add(msg));
}
int topicsAmount = 100;
for (int i = 0; i < topicsAmount; i++) {
RTopic topic = redisson.getTopic("topic" + i);
int finalI = i;
topic.addListener(new StatusListener() {
RedisCluster nodes = redisson.getRedisNodes(RedisNodes.CLUSTER);
nodes.getMasters().forEach(m -> {
RedisClientConfig cc = new RedisClientConfig();
cc.setAddress("redis://" + m.getAddr().getHostString() + ":" + m.getAddr().getPort());
RedisClient c = RedisClient.create(cc);
c.connect().async(RedisCommands.SHUTDOWN);
c.shutdown();
@Override
public void onUnsubscribe(String channel) {
}
@Override
public void onSubscribe(String channel) {
subscriptions.add("topic" + finalI);
}
});
topic.addListener(String.class, (channel, msg) -> messages.add(msg));
}
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(30));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
RedisRunner.RedisProcess master = process.getNodes().stream().filter(x -> x.getRedisServerPort() == master1.getPort()).findFirst().get();
master.stop();
assertThat(subscriptions).hasSize(topicsAmount*2);
Thread.sleep(TimeUnit.SECONDS.toMillis(40));
for (int i = 0; i < topicsAmount; i++) {
RTopic topic = redisson.getTopic("topic" + i);
topic.publish("topic" + i);
}
assertThat(subscriptions).hasSize(140);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assertThat(messages).hasSize(topicsAmount);
});
for (int i = 0; i < topicsAmount; i++) {
RTopic topic = redisson.getTopic("topic" + i);
topic.publish("topic" + i);
}
Thread.sleep(100);
assertThat(messages).hasSize(topicsAmount);
}
public void withCluster(Consumer<RedissonClient> callback) {
@ -1542,38 +1533,106 @@ public class RedissonTopicTest extends RedisDockerTest {
redisCluster.stop();
}
@Test
public void testReattachInClusterMaster() throws Exception {
GenericContainer redisCluster = new GenericContainer<>("vishnunair/docker-redis-cluster")
.withExposedPorts(6379, 6380, 6381, 6382, 6383, 6384)
.withStartupCheckStrategy(new MinimumDurationRunningStartupCheckStrategy(Duration.ofSeconds(10)));
redisCluster.start();
public void withCluster2(Consumer<RedissonClient> callback) {
List<InspectContainerResponse> nodes = new ArrayList<>();
DockerComposeContainer environment =
new DockerComposeContainer(new File("src/test/resources/docker-compose.yml"))
.withExposedService("redis-node-0", 6379)
.withExposedService("redis-node-1", 6379)
.withExposedService("redis-node-2", 6379)
.withExposedService("redis-node-3", 6379)
.withExposedService("redis-node-4", 6379)
.withExposedService("redis-node-5", 6379)
.withExposedService("redis-node-6", 6379)
.withExposedService("redis-node-7", 6379);
environment.start();
try {
Thread.sleep(25000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
for (int i = 0; i < 8; i++) {
Optional<ContainerState> cc = environment.getContainerByServiceName("redis-node-" + i);
nodes.add(cc.get().getContainerInfo());
}
Optional<ContainerState> cc2 = environment.getContainerByServiceName("redis-node-0");
Config config = new Config();
config.useClusterServers()
.setSubscriptionMode(SubscriptionMode.MASTER)
.setNatMapper(new NatMapper() {
@Override
public RedisURI map(RedisURI uri) {
if (redisCluster.getMappedPort(uri.getPort()) == null) {
return uri;
for (InspectContainerResponse node : nodes) {
Ports.Binding[] mappedPort = node.getNetworkSettings()
.getPorts().getBindings().get(new ExposedPort(uri.getPort()));
Map<String, ContainerNetwork> ss = node.getNetworkSettings().getNetworks();
ContainerNetwork s = ss.values().iterator().next();
if (mappedPort != null
&& s.getIpAddress().equals(uri.getHost())) {
return new RedisURI(uri.getScheme(), "127.0.0.1", Integer.valueOf(mappedPort[0].getHostPortSpec()));
}
}
return new RedisURI(uri.getScheme(), redisCluster.getHost(), redisCluster.getMappedPort(uri.getPort()));
return uri;
}
})
.addNodeAddress("redis://127.0.0.1:" + redisCluster.getFirstMappedPort());
.addNodeAddress("redis://127.0.0.1:" + cc2.get().getFirstMappedPort());
RedissonClient redisson = Redisson.create(config);
RedisCluster nodes2 = redisson.getRedisNodes(RedisNodes.CLUSTER);
for (RedisClusterSlave slave : nodes2.getSlaves()) {
slave.setConfig("cluster-node-timeout", "1000");
}
for (RedisClusterMaster master : nodes2.getMasters()) {
master.setConfig("cluster-node-timeout", "1000");
}
callback.accept(redisson);
redisson.shutdown();
environment.stop();
}
@Test
public void testReattachInClusterMaster() throws Exception {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterProcesses process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setSubscriptionMode(SubscriptionMode.MASTER)
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
final AtomicBoolean executed = new AtomicBoolean();
final AtomicInteger subscriptions = new AtomicInteger();
RTopic topic = redisson.getTopic("3");
topic.addListener(new StatusListener() {
@Override
public void onUnsubscribe(String channel) {
}
@Override
public void onSubscribe(String channel) {
subscriptions.incrementAndGet();
@ -1585,32 +1644,29 @@ public class RedissonTopicTest extends RedisDockerTest {
executed.set(true);
}
});
sendCommands(redisson, "3");
RedisCluster nodes = redisson.getRedisNodes(RedisNodes.CLUSTER);
nodes.getMasters().forEach(m -> {
RedisClientConfig cc = new RedisClientConfig();
cc.setAddress("redis://" + m.getAddr().getHostString() + ":" + m.getAddr().getPort());
RedisClient c = RedisClient.create(cc);
c.connect().async(RedisCommands.SHUTDOWN);
c.shutdown();
try {
Thread.sleep(18000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
process.getNodes().stream().filter(x -> master1.getPort() == x.getRedisServerPort())
.forEach(x -> {
try {
x.stop();
Thread.sleep(18000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
Thread.sleep(25000);
redisson.getTopic("3").publish(1);
await().atMost(75, TimeUnit.SECONDS).until(() -> subscriptions.get() == 2);
assertThat(executed.get()).isTrue();
redisson.shutdown();
redisCluster.stop();
process.shutdown();
}
@Test

Loading…
Cancel
Save