refactoring

pull/5457/head
Nikita Koksharov 1 year ago
parent 4c44e1b588
commit 69228326c3

@ -1,9 +1,10 @@
package org.redisson;
import com.github.dockerjava.api.model.PortBinding;
import com.github.dockerjava.api.model.Ports;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.redisson.RedisRunner.RedisProcess;
import org.redisson.api.RBucket;
import org.redisson.api.RPatternTopic;
import org.redisson.api.RTopic;
@ -11,12 +12,16 @@ import org.redisson.api.RedissonClient;
import org.redisson.api.listener.BasePatternStatusListener;
import org.redisson.api.listener.PatternMessageListener;
import org.redisson.api.listener.PatternStatusListener;
import org.redisson.api.redisnode.RedisCluster;
import org.redisson.api.redisnode.RedisClusterMaster;
import org.redisson.api.redisnode.RedisClusterSlave;
import org.redisson.api.redisnode.RedisNodes;
import org.redisson.client.codec.StringCodec;
import org.redisson.config.Config;
import org.redisson.config.SubscriptionMode;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.testcontainers.containers.GenericContainer;
import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
@ -28,7 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
public class RedissonTopicPatternTest extends BaseTest {
public class RedissonTopicPatternTest extends RedisDockerTest {
public static class Message implements Serializable {
@ -69,127 +74,95 @@ public class RedissonTopicPatternTest extends BaseTest {
}
@Test
public void testCluster() throws IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave().notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g);
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave().notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g);
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave().notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g);
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave().notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g);
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave().notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g);
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave().notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g);
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterRunner.ClusterProcesses process = clusterRunner.run();
public void testCluster() {
testInCluster(redisson -> {
RedisCluster nodes = redisson.getRedisNodes(RedisNodes.CLUSTER);
for (RedisClusterSlave slave : nodes.getSlaves()) {
slave.setConfig("notify-keyspace-events", "Eg");
}
for (RedisClusterMaster master : nodes.getMasters()) {
master.setConfig("notify-keyspace-events", "Eg");
}
Thread.sleep(3000);
AtomicInteger subscribeCounter = new AtomicInteger();
RPatternTopic topic = redisson.getPatternTopic("__keyevent@*", StringCodec.INSTANCE);
topic.addListener(new PatternStatusListener() {
@Override
public void onPSubscribe(String pattern) {
subscribeCounter.incrementAndGet();
}
Config config = new Config();
config.useClusterServers()
.setPingConnectionInterval(0)
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
@Override
public void onPUnsubscribe(String pattern) {
System.out.println("onPUnsubscribe: " + pattern);
}
});
AtomicInteger subscribeCounter = new AtomicInteger();
RPatternTopic topic = redisson.getPatternTopic("__keyevent@*", StringCodec.INSTANCE);
topic.addListener(new PatternStatusListener() {
@Override
public void onPSubscribe(String pattern) {
subscribeCounter.incrementAndGet();
}
AtomicInteger counter = new AtomicInteger();
@Override
public void onPUnsubscribe(String pattern) {
System.out.println("onPUnsubscribe: " + pattern);
PatternMessageListener<String> listener = (pattern, channel, msg) -> {
System.out.println("mes " + channel + " counter " + counter.get());
counter.incrementAndGet();
};
topic.addListener(String.class, listener);
for (int i = 0; i < 10; i++) {
redisson.getBucket("" + i).set(i);
redisson.getBucket("" + i).delete();
try {
Thread.sleep(7);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
AtomicInteger counter = new AtomicInteger();
PatternMessageListener<String> listener = (pattern, channel, msg) -> {
System.out.println("mes " + channel + " counter " + counter.get());
counter.incrementAndGet();
};
topic.addListener(String.class, listener);
Awaitility.await().atMost(Duration.ofSeconds(2)).until(() -> counter.get() > 9);
assertThat(subscribeCounter.get()).isEqualTo(1);
for (int i = 0; i < 10; i++) {
redisson.getBucket("" + i).set(i);
redisson.getBucket("" + i).delete();
Thread.sleep(7);
}
Awaitility.await().atMost(Duration.ofSeconds(2)).until(() -> counter.get() > 9);
assertThat(subscribeCounter.get()).isEqualTo(1);
for (RedisClusterSlave slave : nodes.getSlaves()) {
slave.setConfig("notify-keyspace-events", "");
}
for (RedisClusterMaster master : nodes.getMasters()) {
master.setConfig("notify-keyspace-events", "");
}
redisson.shutdown();
process.shutdown();
topic.removeAllListeners();
});
}
@Test
public void testNonEventMessagesInCluster() throws IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
public void testNonEventMessagesInCluster() {
testInCluster(redisson -> {
AtomicInteger subscribeCounter = new AtomicInteger();
RPatternTopic topic = redisson.getPatternTopic("my*", StringCodec.INSTANCE);
topic.addListener(new PatternStatusListener() {
@Override
public void onPSubscribe(String pattern) {
subscribeCounter.incrementAndGet();
}
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterRunner.ClusterProcesses process = clusterRunner.run();
@Override
public void onPUnsubscribe(String pattern) {
System.out.println("onPUnsubscribe: " + pattern);
}
});
Config config = new Config();
config.useClusterServers()
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
AtomicInteger counter = new AtomicInteger();
AtomicInteger subscribeCounter = new AtomicInteger();
RPatternTopic topic = redisson.getPatternTopic("my*", StringCodec.INSTANCE);
topic.addListener(new PatternStatusListener() {
@Override
public void onPSubscribe(String pattern) {
subscribeCounter.incrementAndGet();
}
PatternMessageListener<String> listener = (pattern, channel, msg) -> {
counter.incrementAndGet();
};
topic.addListener(String.class, listener);
@Override
public void onPUnsubscribe(String pattern) {
System.out.println("onPUnsubscribe: " + pattern);
for (int i = 0; i < 100; i++) {
redisson.getTopic("my" + i).publish(123);
}
});
AtomicInteger counter = new AtomicInteger();
PatternMessageListener<String> listener = (pattern, channel, msg) -> {
counter.incrementAndGet();
};
topic.addListener(String.class, listener);
for (int i = 0; i < 100; i++) {
redisson.getTopic("my" + i).publish(123);
}
Awaitility.await().atMost(Duration.ofSeconds(2)).until(() -> counter.get() == 100);
assertThat(subscribeCounter.get()).isEqualTo(1);
Awaitility.await().atMost(Duration.ofSeconds(2)).until(() -> counter.get() == 100);
assertThat(subscribeCounter.get()).isEqualTo(1);
redisson.shutdown();
process.shutdown();
topic.removeAllListeners();
});
}
@Test
@ -211,6 +184,7 @@ public class RedissonTopicPatternTest extends BaseTest {
Assertions.assertEquals(1, i.get());
Assertions.assertEquals(1, str.get());
topic1.removeAllListeners();
}
@Test
@ -232,14 +206,14 @@ public class RedissonTopicPatternTest extends BaseTest {
redisson.getTopic("topic1.t3").publish(new Message("123"));
Assertions.assertTrue(messageRecieved.await(5, TimeUnit.SECONDS));
topic1.removeAllListeners();
}
@Test
public void testLazyUnsubscribe() throws InterruptedException {
final CountDownLatch messageRecieved = new CountDownLatch(1);
RedissonClient redisson1 = BaseTest.createInstance();
RPatternTopic topic1 = redisson1.getPatternTopic("topic.*");
RPatternTopic topic1 = redisson.getPatternTopic("topic.*");
int listenerId = topic1.addListener(Message.class, (pattern, channel, msg) -> {
Assertions.fail();
});
@ -248,7 +222,7 @@ public class RedissonTopicPatternTest extends BaseTest {
topic1.removeListener(listenerId);
Thread.sleep(1000);
RedissonClient redisson2 = BaseTest.createInstance();
RedissonClient redisson2 = createInstance();
RPatternTopic topic2 = redisson2.getPatternTopic("topic.*");
topic2.addListener(Message.class, (pattern, channel, msg) -> {
Assertions.assertTrue(pattern.equals("topic.*"));
@ -262,7 +236,7 @@ public class RedissonTopicPatternTest extends BaseTest {
Assertions.assertTrue(messageRecieved.await(5, TimeUnit.SECONDS));
redisson1.shutdown();
topic1.removeAllListeners();
redisson2.shutdown();
}
@ -271,8 +245,7 @@ public class RedissonTopicPatternTest extends BaseTest {
final CountDownLatch messageRecieved = new CountDownLatch(5);
final CountDownLatch statusRecieved = new CountDownLatch(1);
RedissonClient redisson1 = BaseTest.createInstance();
RPatternTopic topic1 = redisson1.getPatternTopic("topic.*");
RPatternTopic topic1 = redisson.getPatternTopic("topic.*");
topic1.addListener(new BasePatternStatusListener() {
@Override
public void onPSubscribe(String pattern) {
@ -285,7 +258,7 @@ public class RedissonTopicPatternTest extends BaseTest {
messageRecieved.countDown();
});
RedissonClient redisson2 = BaseTest.createInstance();
RedissonClient redisson2 = createInstance();
RTopic topic2 = redisson2.getTopic("topic.t1");
topic2.addListener(Message.class, (channel, msg) -> {
Assertions.assertEquals(new Message("123"), msg);
@ -305,14 +278,13 @@ public class RedissonTopicPatternTest extends BaseTest {
statusRecieved.await();
Assertions.assertTrue(messageRecieved.await(5, TimeUnit.SECONDS));
redisson1.shutdown();
topic1.removeAllListeners();
redisson2.shutdown();
}
@Test
public void testListenerRemove() throws InterruptedException {
RedissonClient redisson1 = BaseTest.createInstance();
RPatternTopic topic1 = redisson1.getPatternTopic("topic.*");
public void testListenerRemove() {
RPatternTopic topic1 = redisson.getPatternTopic("topic.*");
final CountDownLatch l = new CountDownLatch(1);
topic1.addListener(new BasePatternStatusListener() {
@Override
@ -325,19 +297,19 @@ public class RedissonTopicPatternTest extends BaseTest {
Assertions.fail();
});
RedissonClient redisson2 = BaseTest.createInstance();
RedissonClient redisson2 = createInstance();
RTopic topic2 = redisson2.getTopic("topic.t1");
topic1.removeListener(id);
topic2.publish(new Message("123"));
redisson1.shutdown();
topic1.removeAllListeners();
redisson2.shutdown();
}
@Test
public void testConcurrentTopic() throws Exception {
int threads = 30;
int loops = 50000;
int threads = 16;
int loops = 25000;
ExecutorService executor = Executors.newFixedThreadPool(threads);
List<Future<?>> futures = new ArrayList<>();
@ -372,6 +344,9 @@ public class RedissonTopicPatternTest extends BaseTest {
for (Future<?> future : futures) {
future.get();
}
RPatternTopic t = redisson.getPatternTopic("PUBSUB*");
t.removeAllListeners();
}
@Test
@ -462,19 +437,23 @@ public class RedissonTopicPatternTest extends BaseTest {
assertThat(executions.get()).isEqualTo(100);
redisson.shutdown();
process.shutdown();
}
@Test
public void testReattach() throws InterruptedException, IOException, ExecutionException, TimeoutException {
RedisProcess runner = new RedisRunner()
.nosave()
.randomDir()
.randomPort()
.run();
public void testReattach() throws InterruptedException {
GenericContainer<?> redis =
new GenericContainer<>("redis:7.2")
.withExposedPorts(6379);
redis.start();
int port = redis.getFirstMappedPort();
redis.withCreateContainerCmdModifier(cmd -> {
cmd.getHostConfig().withPortBindings(
new PortBinding(Ports.Binding.bindPort(Integer.valueOf(port)),
cmd.getExposedPorts()[0]));
});
Config config = new Config();
config.useSingleServer().setAddress(runner.getRedisServerAddressAndPort());
config.useSingleServer().setAddress("redis://127.0.0.1:" + redis.getFirstMappedPort());
RedissonClient redisson = Redisson.create(config);
final AtomicBoolean executed = new AtomicBoolean();
@ -488,15 +467,10 @@ public class RedissonTopicPatternTest extends BaseTest {
}
}
});
runner.stop();
runner = new RedisRunner()
.port(runner.getRedisServerPort())
.nosave()
.randomDir()
.run();
redis.stop();
redis.start();
Thread.sleep(1000);
redisson.getTopic("topic1").publish(1);
@ -504,7 +478,7 @@ public class RedissonTopicPatternTest extends BaseTest {
await().atMost(5, TimeUnit.SECONDS).untilTrue(executed);
redisson.shutdown();
runner.stop();
redis.stop();
}
}

Loading…
Cancel
Save