|
|
|
@ -2,15 +2,14 @@ package org.redisson;
|
|
|
|
|
|
|
|
|
|
import org.junit.jupiter.api.Assertions;
|
|
|
|
|
import org.junit.jupiter.api.Test;
|
|
|
|
|
import org.redisson.RedisRunner.RedisProcess;
|
|
|
|
|
import org.redisson.api.NameMapper;
|
|
|
|
|
import org.redisson.api.RBoundedBlockingQueue;
|
|
|
|
|
import org.redisson.api.RFuture;
|
|
|
|
|
import org.redisson.api.RedissonClient;
|
|
|
|
|
import org.redisson.client.RedisException;
|
|
|
|
|
import org.redisson.config.Config;
|
|
|
|
|
import org.testcontainers.containers.GenericContainer;
|
|
|
|
|
|
|
|
|
|
import java.io.IOException;
|
|
|
|
|
import java.time.Duration;
|
|
|
|
|
import java.util.*;
|
|
|
|
|
import java.util.concurrent.*;
|
|
|
|
@ -20,11 +19,11 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
import static org.assertj.core.api.Assertions.assertThat;
|
|
|
|
|
import static org.awaitility.Awaitility.await;
|
|
|
|
|
|
|
|
|
|
public class RedissonBoundedBlockingQueueTest extends BaseTest {
|
|
|
|
|
public class RedissonBoundedBlockingQueueTest extends RedisDockerTest {
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void testNameMapper() {
|
|
|
|
|
Config config = new Config();
|
|
|
|
|
Config config = createConfig();
|
|
|
|
|
config.useSingleServer()
|
|
|
|
|
.setNameMapper(new NameMapper() {
|
|
|
|
|
@Override
|
|
|
|
@ -36,8 +35,7 @@ public class RedissonBoundedBlockingQueueTest extends BaseTest {
|
|
|
|
|
public String unmap(String name) {
|
|
|
|
|
return name.replace(":suffix:", "");
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
.setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort());
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
RedissonClient redisson = Redisson.create(config);
|
|
|
|
|
RBoundedBlockingQueue<Integer> queue = redisson.getBoundedBlockingQueue("bounded-queue");
|
|
|
|
@ -243,17 +241,15 @@ public class RedissonBoundedBlockingQueueTest extends BaseTest {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void testPollWithBrokenConnection() throws IOException, InterruptedException, ExecutionException {
|
|
|
|
|
RedisProcess runner = new RedisRunner()
|
|
|
|
|
.nosave()
|
|
|
|
|
.randomDir()
|
|
|
|
|
.randomPort()
|
|
|
|
|
.run();
|
|
|
|
|
|
|
|
|
|
public void testPollWithBrokenConnection() throws InterruptedException, ExecutionException {
|
|
|
|
|
GenericContainer<?> redis = createRedis();
|
|
|
|
|
redis.start();
|
|
|
|
|
|
|
|
|
|
Config config = new Config();
|
|
|
|
|
config.useSingleServer().setAddress(runner.getRedisServerAddressAndPort());
|
|
|
|
|
config.useSingleServer().setAddress("redis://" + redis.getHost() + ":" + redis.getFirstMappedPort());
|
|
|
|
|
RedissonClient redisson = Redisson.create(config);
|
|
|
|
|
final RBoundedBlockingQueue<Integer> queue1 = redisson.getBoundedBlockingQueue("bounded-queue:pollTimeout");
|
|
|
|
|
|
|
|
|
|
RBoundedBlockingQueue<Integer> queue1 = redisson.getBoundedBlockingQueue("bounded-queue:pollTimeout");
|
|
|
|
|
assertThat(queue1.trySetCapacity(5)).isTrue();
|
|
|
|
|
RFuture<Integer> f = queue1.pollAsync(5, TimeUnit.SECONDS);
|
|
|
|
|
|
|
|
|
@ -263,7 +259,7 @@ public class RedissonBoundedBlockingQueueTest extends BaseTest {
|
|
|
|
|
} catch (TimeoutException e) {
|
|
|
|
|
// skip
|
|
|
|
|
}
|
|
|
|
|
runner.stop();
|
|
|
|
|
redis.stop();
|
|
|
|
|
|
|
|
|
|
long start = System.currentTimeMillis();
|
|
|
|
|
assertThat(f.get()).isNull();
|
|
|
|
@ -273,18 +269,14 @@ public class RedissonBoundedBlockingQueueTest extends BaseTest {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void testPollReattach() throws InterruptedException, IOException, ExecutionException, TimeoutException {
|
|
|
|
|
RedisProcess runner = new RedisRunner()
|
|
|
|
|
.nosave()
|
|
|
|
|
.randomDir()
|
|
|
|
|
.randomPort()
|
|
|
|
|
.run();
|
|
|
|
|
|
|
|
|
|
public void testPollReattach() throws InterruptedException {
|
|
|
|
|
GenericContainer<?> redis = createRedis();
|
|
|
|
|
redis.start();
|
|
|
|
|
|
|
|
|
|
Config config = new Config();
|
|
|
|
|
config.useSingleServer().setAddress(runner.getRedisServerAddressAndPort());
|
|
|
|
|
config.useSingleServer().setAddress("redis://" + redis.getHost() + ":" + redis.getFirstMappedPort());
|
|
|
|
|
RedissonClient redisson = Redisson.create(config);
|
|
|
|
|
redisson.getKeys().flushall();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
final AtomicBoolean executed = new AtomicBoolean();
|
|
|
|
|
|
|
|
|
|
Thread t = new Thread() {
|
|
|
|
@ -306,14 +298,11 @@ public class RedissonBoundedBlockingQueueTest extends BaseTest {
|
|
|
|
|
|
|
|
|
|
t.start();
|
|
|
|
|
t.join(1000);
|
|
|
|
|
runner.stop();
|
|
|
|
|
|
|
|
|
|
runner = new RedisRunner()
|
|
|
|
|
.port(runner.getRedisServerPort())
|
|
|
|
|
.nosave()
|
|
|
|
|
.randomDir()
|
|
|
|
|
.run();
|
|
|
|
|
|
|
|
|
|
redis.setPortBindings(Arrays.asList(redis.getFirstMappedPort() + ":6379"));
|
|
|
|
|
redis.stop();
|
|
|
|
|
redis.start();
|
|
|
|
|
|
|
|
|
|
Thread.sleep(1000);
|
|
|
|
|
|
|
|
|
|
RBoundedBlockingQueue<Integer> queue1 = redisson.getBoundedBlockingQueue("queue:pollany");
|
|
|
|
@ -325,19 +314,16 @@ public class RedissonBoundedBlockingQueueTest extends BaseTest {
|
|
|
|
|
await().atMost(5, TimeUnit.SECONDS).untilTrue(executed);
|
|
|
|
|
|
|
|
|
|
redisson.shutdown();
|
|
|
|
|
runner.stop();
|
|
|
|
|
redis.stop();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void testPollAsyncReattach() throws InterruptedException, IOException, ExecutionException, TimeoutException {
|
|
|
|
|
RedisProcess runner = new RedisRunner()
|
|
|
|
|
.nosave()
|
|
|
|
|
.randomDir()
|
|
|
|
|
.randomPort()
|
|
|
|
|
.run();
|
|
|
|
|
public void testPollAsyncReattach() throws InterruptedException, ExecutionException, TimeoutException {
|
|
|
|
|
GenericContainer<?> redis = createRedis();
|
|
|
|
|
redis.start();
|
|
|
|
|
|
|
|
|
|
Config config = new Config();
|
|
|
|
|
config.useSingleServer().setAddress(runner.getRedisServerAddressAndPort());
|
|
|
|
|
config.useSingleServer().setAddress("redis://" + redis.getHost() + ":" + redis.getFirstMappedPort());
|
|
|
|
|
RedissonClient redisson = Redisson.create(config);
|
|
|
|
|
|
|
|
|
|
RBoundedBlockingQueue<Integer> queue1 = redisson.getBoundedBlockingQueue("queue:pollany");
|
|
|
|
@ -347,13 +333,11 @@ public class RedissonBoundedBlockingQueueTest extends BaseTest {
|
|
|
|
|
} catch (ExecutionException | TimeoutException e) {
|
|
|
|
|
// skip
|
|
|
|
|
}
|
|
|
|
|
runner.stop();
|
|
|
|
|
|
|
|
|
|
runner = new RedisRunner()
|
|
|
|
|
.port(runner.getRedisServerPort())
|
|
|
|
|
.nosave()
|
|
|
|
|
.randomDir()
|
|
|
|
|
.run();
|
|
|
|
|
redis.setPortBindings(Arrays.asList(redis.getFirstMappedPort() + ":6379"));
|
|
|
|
|
redis.stop();
|
|
|
|
|
redis.start();
|
|
|
|
|
|
|
|
|
|
assertThat(queue1.trySetCapacity(15)).isTrue();
|
|
|
|
|
queue1.put(123);
|
|
|
|
|
|
|
|
|
@ -367,20 +351,17 @@ public class RedissonBoundedBlockingQueueTest extends BaseTest {
|
|
|
|
|
assertThat(result).isEqualTo(123);
|
|
|
|
|
|
|
|
|
|
redisson.shutdown();
|
|
|
|
|
runner.stop();
|
|
|
|
|
redis.stop();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void testTakeReattach() throws InterruptedException, IOException, ExecutionException, TimeoutException {
|
|
|
|
|
RedisProcess runner = new RedisRunner()
|
|
|
|
|
.nosave()
|
|
|
|
|
.randomDir()
|
|
|
|
|
.randomPort()
|
|
|
|
|
.run();
|
|
|
|
|
public void testTakeReattach() throws InterruptedException, ExecutionException, TimeoutException {
|
|
|
|
|
GenericContainer<?> redis = createRedis();
|
|
|
|
|
redis.start();
|
|
|
|
|
|
|
|
|
|
Config config = new Config();
|
|
|
|
|
config.useSingleServer().setAddress(runner.getRedisServerAddressAndPort());
|
|
|
|
|
config.useSingleServer().setAddress("redis://" + redis.getHost() + ":" + redis.getFirstMappedPort());
|
|
|
|
|
RedissonClient redisson = Redisson.create(config);
|
|
|
|
|
redisson.getKeys().flushall();
|
|
|
|
|
|
|
|
|
@ -392,13 +373,11 @@ public class RedissonBoundedBlockingQueueTest extends BaseTest {
|
|
|
|
|
} catch (ExecutionException | TimeoutException e) {
|
|
|
|
|
// skip
|
|
|
|
|
}
|
|
|
|
|
runner.stop();
|
|
|
|
|
|
|
|
|
|
runner = new RedisRunner()
|
|
|
|
|
.port(runner.getRedisServerPort())
|
|
|
|
|
.nosave()
|
|
|
|
|
.randomDir()
|
|
|
|
|
.run();
|
|
|
|
|
redis.setPortBindings(Arrays.asList(redis.getFirstMappedPort() + ":6379"));
|
|
|
|
|
redis.stop();
|
|
|
|
|
redis.start();
|
|
|
|
|
|
|
|
|
|
assertThat(queue1.trySetCapacity(15)).isTrue();
|
|
|
|
|
queue1.put(123);
|
|
|
|
|
|
|
|
|
@ -410,9 +389,9 @@ public class RedissonBoundedBlockingQueueTest extends BaseTest {
|
|
|
|
|
|
|
|
|
|
Integer result = f.get(1, TimeUnit.SECONDS);
|
|
|
|
|
assertThat(result).isEqualTo(123);
|
|
|
|
|
runner.stop();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
redisson.shutdown();
|
|
|
|
|
redis.stop();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|