refactoring

pull/5884/head
Nikita Koksharov 10 months ago
parent e7580b62d9
commit 27a03c8d31

@ -505,10 +505,7 @@ public class RedisDockerTest {
return false; return false;
} }
String r = execute(node, "redis-cli", "info", "replication"); String r = execute(node, "redis-cli", "info", "replication");
if (r.contains("role:slave")) { return r.contains("role:slave");
return true;
}
return false;
}).collect(Collectors.toList()); }).collect(Collectors.toList());
} }
@ -518,10 +515,7 @@ public class RedisDockerTest {
return false; return false;
} }
String r = execute(node, "redis-cli", "info", "replication"); String r = execute(node, "redis-cli", "info", "replication");
if (r.contains("role:master")) { return r.contains("role:master");
return true;
}
return false;
}).collect(Collectors.toList()); }).collect(Collectors.toList());
} }

@ -20,6 +20,7 @@ import org.redisson.client.codec.StringCodec;
import org.redisson.config.Config; import org.redisson.config.Config;
import org.redisson.config.SubscriptionMode; import org.redisson.config.SubscriptionMode;
import org.redisson.connection.balancer.RandomLoadBalancer; import org.redisson.connection.balancer.RandomLoadBalancer;
import org.testcontainers.containers.ContainerState;
import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.GenericContainer;
import java.io.Serializable; import java.io.Serializable;
@ -101,7 +102,6 @@ public class RedissonTopicPatternTest extends RedisDockerTest {
AtomicInteger counter = new AtomicInteger(); AtomicInteger counter = new AtomicInteger();
PatternMessageListener<String> listener = (pattern, channel, msg) -> { PatternMessageListener<String> listener = (pattern, channel, msg) -> {
System.out.println("mes " + channel + " counter " + counter.get());
counter.incrementAndGet(); counter.incrementAndGet();
}; };
topic.addListener(String.class, listener); topic.addListener(String.class, listener);
@ -349,70 +349,31 @@ public class RedissonTopicPatternTest extends RedisDockerTest {
t.removeAllListeners(); t.removeAllListeners();
} }
@Test // @Test
public void testReattachInClusterSlave() throws Exception { public void testReattachInClusterSlave() {
testReattachInCluster(SubscriptionMode.SLAVE); testReattachInCluster(SubscriptionMode.SLAVE);
} }
@Test @Test
public void testReattachInClusterMaster() throws Exception { public void testReattachInClusterMaster() {
testReattachInCluster(SubscriptionMode.MASTER); testReattachInCluster(SubscriptionMode.MASTER);
} }
private void testReattachInCluster(SubscriptionMode subscriptionMode) throws Exception { private void testReattachInCluster(SubscriptionMode subscriptionMode) {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave() withNewCluster((nds, instance) -> {
.notifyKeyspaceEvents( Config config = instance.getConfig();
RedisRunner.KEYSPACE_EVENTS_OPTIONS.K,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.$);
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave()
.notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.K,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.$);
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave()
.notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.K,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.$);
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave()
.notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.K,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.$);
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave()
.notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.K,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.$);
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave()
.notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.K,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.$);
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterRunner.ClusterProcesses process = clusterRunner.run();
Thread.sleep(7000);
Config config = new Config();
config.useClusterServers() config.useClusterServers()
.setSubscriptionMode(subscriptionMode) .setSubscriptionMode(subscriptionMode);
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config); RedissonClient redisson = Redisson.create(config);
RedisCluster nodes = redisson.getRedisNodes(RedisNodes.CLUSTER);
for (RedisClusterSlave slave : nodes.getSlaves()) {
slave.setConfig("notify-keyspace-events", "KgE$");
}
for (RedisClusterMaster master : nodes.getMasters()) {
master.setConfig("notify-keyspace-events", "KgE$");
}
AtomicInteger executions = new AtomicInteger(); AtomicInteger executions = new AtomicInteger();
RPatternTopic topic = redisson.getPatternTopic("__keyevent@*:del", StringCodec.INSTANCE); RPatternTopic topic = redisson.getPatternTopic("__keyevent@*:del", StringCodec.INSTANCE);
@ -423,22 +384,39 @@ public class RedissonTopicPatternTest extends RedisDockerTest {
} }
}); });
process.getNodes().stream().filter(x -> master1.getPort() == x.getRedisServerPort()) List<ContainerState> masters = getMasterNodes(nds);
.forEach(x -> { stop(masters.get(0));
x.stop();
});
try {
Thread.sleep(40000); Thread.sleep(40000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
nodes = redisson.getRedisNodes(RedisNodes.CLUSTER);
for (RedisClusterSlave slave : nodes.getSlaves()) {
slave.setConfig("notify-keyspace-events", "KgE$");
}
for (RedisClusterMaster master : nodes.getMasters()) {
master.setConfig("notify-keyspace-events", "KgE$");
}
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
RBucket<Object> b = redisson.getBucket("test" + i); RBucket<Object> b = redisson.getBucket("test" + i);
b.set(i); b.set(i);
b.delete(); b.delete();
} }
try {
Thread.sleep(100); Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assertThat(executions.get()).isEqualTo(100); assertThat(executions.get()).isEqualTo(100);
redisson.shutdown(); redisson.shutdown();
});
} }
@Test @Test

Loading…
Cancel
Save