|
|
|
@ -204,7 +204,7 @@ public class RedissonTopicTest {
|
|
|
|
|
futures.add(s);
|
|
|
|
|
}
|
|
|
|
|
executor.shutdown();
|
|
|
|
|
Assertions.assertTrue(executor.awaitTermination(120, TimeUnit.SECONDS));
|
|
|
|
|
assertThat(executor.awaitTermination(120, TimeUnit.SECONDS)).isTrue();
|
|
|
|
|
|
|
|
|
|
for (Future<?> future : futures) {
|
|
|
|
|
future.get();
|
|
|
|
@ -321,7 +321,7 @@ public class RedissonTopicTest {
|
|
|
|
|
final RTopic topic1 = redisson1.getTopic("topic1");
|
|
|
|
|
final CountDownLatch messageRecieved = new CountDownLatch(3);
|
|
|
|
|
int listenerId = topic1.addListener(Message.class, (channel, msg) -> {
|
|
|
|
|
Assertions.assertEquals(msg, new Message("test"));
|
|
|
|
|
assertThat(msg).isEqualTo(new Message("test"));
|
|
|
|
|
messageRecieved.countDown();
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
@ -337,7 +337,7 @@ public class RedissonTopicTest {
|
|
|
|
|
});
|
|
|
|
|
topic2.publish(new Message("123"));
|
|
|
|
|
|
|
|
|
|
Assertions.assertTrue(messageRecieved.await(5, TimeUnit.SECONDS));
|
|
|
|
|
assertThat(messageRecieved.await(5, TimeUnit.SECONDS)).isTrue();
|
|
|
|
|
|
|
|
|
|
redisson1.shutdown();
|
|
|
|
|
redisson2.shutdown();
|
|
|
|
@ -351,7 +351,7 @@ public class RedissonTopicTest {
|
|
|
|
|
int listenerId = topic1.addListener(new BaseStatusListener() {
|
|
|
|
|
@Override
|
|
|
|
|
public void onSubscribe(String channel) {
|
|
|
|
|
Assertions.assertEquals("topic1", channel);
|
|
|
|
|
assertThat(channel).isEqualTo("topic1");
|
|
|
|
|
l.countDown();
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
@ -361,14 +361,14 @@ public class RedissonTopicTest {
|
|
|
|
|
int listenerId2 = topic1.addListener(new BaseStatusListener() {
|
|
|
|
|
@Override
|
|
|
|
|
public void onUnsubscribe(String channel) {
|
|
|
|
|
Assertions.assertEquals("topic1", channel);
|
|
|
|
|
assertThat(channel).isEqualTo("topic1");
|
|
|
|
|
l.countDown();
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
topic1.removeListener(listenerId);
|
|
|
|
|
topic1.removeListener(listenerId2);
|
|
|
|
|
|
|
|
|
|
Assertions.assertTrue(l.await(5, TimeUnit.SECONDS));
|
|
|
|
|
|
|
|
|
|
assertThat(l.await(5, TimeUnit.SECONDS)).isTrue();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
@ -475,8 +475,8 @@ public class RedissonTopicTest {
|
|
|
|
|
Assertions.fail();
|
|
|
|
|
});
|
|
|
|
|
topic1.addListener(Message.class, (channel, msg) -> {
|
|
|
|
|
Assertions.assertEquals("topic1", channel.toString());
|
|
|
|
|
Assertions.assertEquals(new Message("123"), msg);
|
|
|
|
|
assertThat(channel.toString()).isEqualTo("topic1");
|
|
|
|
|
assertThat(msg).isEqualTo(new Message("123"));
|
|
|
|
|
messageRecieved.countDown();
|
|
|
|
|
});
|
|
|
|
|
topic1.removeListener(listenerId);
|
|
|
|
@ -484,7 +484,7 @@ public class RedissonTopicTest {
|
|
|
|
|
topic1 = redisson.getTopic("topic1");
|
|
|
|
|
topic1.publish(new Message("123"));
|
|
|
|
|
|
|
|
|
|
Assertions.assertTrue(messageRecieved.await(5, TimeUnit.SECONDS));
|
|
|
|
|
assertThat(messageRecieved.await(5, TimeUnit.SECONDS)).isTrue();
|
|
|
|
|
|
|
|
|
|
redisson.shutdown();
|
|
|
|
|
}
|
|
|
|
@ -620,12 +620,12 @@ public class RedissonTopicTest {
|
|
|
|
|
RedissonClient redisson2 = BaseTest.createInstance();
|
|
|
|
|
RTopic topic2 = redisson2.getTopic("topic");
|
|
|
|
|
topic2.addListener(Message.class, (channel, msg) -> {
|
|
|
|
|
Assertions.assertEquals(new Message("123"), msg);
|
|
|
|
|
assertThat(msg).isEqualTo(new Message("123"));
|
|
|
|
|
messageRecieved.countDown();
|
|
|
|
|
});
|
|
|
|
|
topic2.publish(new Message("123"));
|
|
|
|
|
|
|
|
|
|
Assertions.assertTrue(messageRecieved.await(5, TimeUnit.SECONDS));
|
|
|
|
|
assertThat(messageRecieved.await(5, TimeUnit.SECONDS)).isTrue();
|
|
|
|
|
|
|
|
|
|
redisson1.shutdown();
|
|
|
|
|
redisson2.shutdown();
|
|
|
|
@ -638,14 +638,14 @@ public class RedissonTopicTest {
|
|
|
|
|
RedissonClient redisson1 = BaseTest.createInstance();
|
|
|
|
|
RTopic topic1 = redisson1.getTopic("topic");
|
|
|
|
|
topic1.addListener(Message.class, (channel, msg) -> {
|
|
|
|
|
Assertions.assertEquals(new Message("123"), msg);
|
|
|
|
|
assertThat(msg).isEqualTo(new Message("123"));
|
|
|
|
|
messageRecieved.countDown();
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
RedissonClient redisson2 = BaseTest.createInstance();
|
|
|
|
|
RTopic topic2 = redisson2.getTopic("topic");
|
|
|
|
|
topic2.addListener(Message.class, (channel, msg) -> {
|
|
|
|
|
Assertions.assertEquals(new Message("123"), msg);
|
|
|
|
|
assertThat(msg).isEqualTo(new Message("123"));
|
|
|
|
|
messageRecieved.countDown();
|
|
|
|
|
});
|
|
|
|
|
topic2.publish(new Message("123"));
|
|
|
|
@ -664,7 +664,7 @@ public class RedissonTopicTest {
|
|
|
|
|
RedissonClient redisson1 = BaseTest.createInstance();
|
|
|
|
|
RTopic topic1 = redisson1.getTopic("topic");
|
|
|
|
|
topic1.addListener(Message.class, (channel, msg) -> {
|
|
|
|
|
Assertions.assertEquals(new Message("123"), msg);
|
|
|
|
|
assertThat(msg).isEqualTo(new Message("123"));
|
|
|
|
|
messageRecieved.countDown();
|
|
|
|
|
counter.incrementAndGet();
|
|
|
|
|
});
|
|
|
|
@ -672,7 +672,7 @@ public class RedissonTopicTest {
|
|
|
|
|
RedissonClient redisson2 = BaseTest.createInstance();
|
|
|
|
|
RTopic topic2 = redisson2.getTopic("topic");
|
|
|
|
|
topic2.addListener(Message.class, (channel, msg) -> {
|
|
|
|
|
Assertions.assertEquals(new Message("123"), msg);
|
|
|
|
|
assertThat(msg).isEqualTo(new Message("123"));
|
|
|
|
|
messageRecieved.countDown();
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
@ -685,7 +685,7 @@ public class RedissonTopicTest {
|
|
|
|
|
|
|
|
|
|
Thread.sleep(1000);
|
|
|
|
|
|
|
|
|
|
Assertions.assertEquals(count, counter.get());
|
|
|
|
|
assertThat(count).isEqualTo(counter.get());
|
|
|
|
|
|
|
|
|
|
redisson1.shutdown();
|
|
|
|
|
redisson2.shutdown();
|
|
|
|
@ -882,7 +882,7 @@ public class RedissonTopicTest {
|
|
|
|
|
System.out.println("Failover Finished, start to see Subscribe timeouts now. Can't recover this without a refresh of redison client ");
|
|
|
|
|
Thread.sleep(java.time.Duration.ofSeconds(10).toMillis());
|
|
|
|
|
|
|
|
|
|
Assertions.assertFalse(exceptionDetected.get());
|
|
|
|
|
assertThat(exceptionDetected.get()).isFalse();
|
|
|
|
|
|
|
|
|
|
executor1.shutdownNow();
|
|
|
|
|
|
|
|
|
@ -1020,8 +1020,8 @@ public class RedissonTopicTest {
|
|
|
|
|
redisson.getTopic("topic").publish(1);
|
|
|
|
|
|
|
|
|
|
await().atMost(20, TimeUnit.SECONDS).until(() -> subscriptions.get() == 2);
|
|
|
|
|
Assertions.assertTrue(executed.get());
|
|
|
|
|
|
|
|
|
|
assertThat(executed.get()).isTrue();
|
|
|
|
|
|
|
|
|
|
redisson.shutdown();
|
|
|
|
|
sentinel1.stop();
|
|
|
|
|
sentinel2.stop();
|
|
|
|
@ -1178,7 +1178,7 @@ public class RedissonTopicTest {
|
|
|
|
|
redisson.getTopic("topic").publish(1);
|
|
|
|
|
|
|
|
|
|
await().atMost(20, TimeUnit.SECONDS).until(() -> subscriptions.get() == 2);
|
|
|
|
|
Assertions.assertTrue(executed.get());
|
|
|
|
|
assertThat(executed.get()).isTrue();
|
|
|
|
|
|
|
|
|
|
redisson.shutdown();
|
|
|
|
|
sentinel1.stop();
|
|
|
|
@ -1301,6 +1301,7 @@ public class RedissonTopicTest {
|
|
|
|
|
executed.set(true);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
assertThat(topic.countListeners()).isEqualTo(2);
|
|
|
|
|
|
|
|
|
|
sendCommands(redisson, "topic");
|
|
|
|
|
|
|
|
|
@ -1320,7 +1321,8 @@ public class RedissonTopicTest {
|
|
|
|
|
redisson.getTopic("topic").publish(1);
|
|
|
|
|
|
|
|
|
|
await().atMost(75, TimeUnit.SECONDS).until(() -> subscriptions.get() == 2);
|
|
|
|
|
Assertions.assertTrue(executed.get());
|
|
|
|
|
assertThat(topic.countListeners()).isEqualTo(2);
|
|
|
|
|
assertThat(executed.get()).isTrue();
|
|
|
|
|
|
|
|
|
|
redisson.shutdown();
|
|
|
|
|
process.shutdown();
|
|
|
|
@ -1389,7 +1391,7 @@ public class RedissonTopicTest {
|
|
|
|
|
redisson.getTopic("3").publish(1);
|
|
|
|
|
|
|
|
|
|
await().atMost(75, TimeUnit.SECONDS).until(() -> subscriptions.get() == 2);
|
|
|
|
|
Assertions.assertTrue(executed.get());
|
|
|
|
|
assertThat(executed.get()).isTrue();
|
|
|
|
|
|
|
|
|
|
redisson.shutdown();
|
|
|
|
|
process.shutdown();
|
|
|
|
@ -1429,7 +1431,7 @@ public class RedissonTopicTest {
|
|
|
|
|
});
|
|
|
|
|
topic.addListener(String.class,
|
|
|
|
|
(pattern, channel, msg) -> messagesReceived.incrementAndGet());
|
|
|
|
|
Assertions.assertEquals(1, subscriptions.get());
|
|
|
|
|
assertThat(subscriptions.get()).isEqualTo(1);
|
|
|
|
|
|
|
|
|
|
sendCommands(redisson, "dummy").join();
|
|
|
|
|
await().atMost(30, TimeUnit.SECONDS).until(() -> messagesReceived.get() == 100);
|
|
|
|
|