|
|
|
@ -18,6 +18,7 @@ import java.util.concurrent.Future;
|
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
|
|
|
|
|
import org.awaitility.Duration;
|
|
|
|
|
import org.junit.After;
|
|
|
|
@ -151,7 +152,9 @@ public class RedissonTopicTest {
|
|
|
|
|
CountDownLatch latch = new CountDownLatch(count);
|
|
|
|
|
|
|
|
|
|
RTopic eventsTopic = redisson.getTopic("eventsTopic");
|
|
|
|
|
AtomicInteger co = new AtomicInteger();
|
|
|
|
|
eventsTopic.addListener(String.class, (channel, msg) -> {
|
|
|
|
|
co.incrementAndGet();
|
|
|
|
|
latch.countDown();
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
@ -161,7 +164,7 @@ public class RedissonTopicTest {
|
|
|
|
|
Thread.sleep(10);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
|
|
|
|
|
assertThat(latch.await(40, TimeUnit.SECONDS)).isTrue();
|
|
|
|
|
|
|
|
|
|
redisson.shutdown();
|
|
|
|
|
}
|
|
|
|
@ -201,7 +204,7 @@ public class RedissonTopicTest {
|
|
|
|
|
futures.add(s);
|
|
|
|
|
}
|
|
|
|
|
executor.shutdown();
|
|
|
|
|
Assert.assertTrue(executor.awaitTermination(threads * loops * 1000, TimeUnit.SECONDS));
|
|
|
|
|
Assert.assertTrue(executor.awaitTermination(100, TimeUnit.SECONDS));
|
|
|
|
|
|
|
|
|
|
for (Future<?> future : futures) {
|
|
|
|
|
future.get();
|
|
|
|
@ -561,18 +564,17 @@ public class RedissonTopicTest {
|
|
|
|
|
redisson2.shutdown();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
volatile long counter;
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void testHeavyLoad() throws InterruptedException {
|
|
|
|
|
final CountDownLatch messageRecieved = new CountDownLatch(1000);
|
|
|
|
|
|
|
|
|
|
AtomicLong counter = new AtomicLong();
|
|
|
|
|
RedissonClient redisson1 = BaseTest.createInstance();
|
|
|
|
|
RTopic topic1 = redisson1.getTopic("topic");
|
|
|
|
|
topic1.addListener(Message.class, (channel, msg) -> {
|
|
|
|
|
Assert.assertEquals(new Message("123"), msg);
|
|
|
|
|
messageRecieved.countDown();
|
|
|
|
|
counter++;
|
|
|
|
|
counter.incrementAndGet();
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
RedissonClient redisson2 = BaseTest.createInstance();
|
|
|
|
@ -582,7 +584,8 @@ public class RedissonTopicTest {
|
|
|
|
|
messageRecieved.countDown();
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
for (int i = 0; i < 5000; i++) {
|
|
|
|
|
int count = 10000;
|
|
|
|
|
for (int i = 0; i < count; i++) {
|
|
|
|
|
topic2.publish(new Message("123"));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -590,7 +593,7 @@ public class RedissonTopicTest {
|
|
|
|
|
|
|
|
|
|
Thread.sleep(1000);
|
|
|
|
|
|
|
|
|
|
Assert.assertEquals(5000, counter);
|
|
|
|
|
Assert.assertEquals(count, counter.get());
|
|
|
|
|
|
|
|
|
|
redisson1.shutdown();
|
|
|
|
|
redisson2.shutdown();
|
|
|
|
|