refactoring

pull/5468/head
Nikita Koksharov 1 year ago
parent 391e58c85b
commit 7e5ed06fc4

@ -8,8 +8,6 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.api.*;
import org.redisson.api.BatchOptions.ExecutionMode;
import org.redisson.client.RedisClient;
@ -26,7 +24,6 @@ import org.redisson.misc.RedisURI;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy;
import java.io.IOException;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
@ -37,7 +34,7 @@ import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonBatchTest extends BaseTest {
public class RedissonBatchTest extends RedisDockerTest {
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] {
@ -146,62 +143,46 @@ public class RedissonBatchTest extends BaseTest {
@ParameterizedTest
@MethodSource("data")
public void testConnectionLeak(BatchOptions batchOptions) throws Exception {
public void testConnectionLeak(BatchOptions batchOptions) {
Assumptions.assumeTrue(batchOptions.getExecutionMode() == ExecutionMode.IN_MEMORY);
RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave();
RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave();
RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave();
RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave();
RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave();
RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterRunner.ClusterProcesses process = clusterRunner.run();
Thread.sleep(1000);
Config config = new Config();
config.useClusterServers()
.setConnectTimeout(500).setPingConnectionInterval(2000)
.setMasterConnectionMinimumIdleSize(1)
.setMasterConnectionPoolSize(1)
.setSlaveConnectionMinimumIdleSize(1)
.setSlaveConnectionPoolSize(1)
.setTimeout(100)
.setRetryAttempts(0)
.setRetryInterval(20)
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
testInCluster(client -> {
Config config = client.getConfig();
config.useClusterServers()
.setConnectTimeout(500).setPingConnectionInterval(2000)
.setMasterConnectionMinimumIdleSize(1)
.setMasterConnectionPoolSize(1)
.setSlaveConnectionMinimumIdleSize(1)
.setSlaveConnectionPoolSize(1)
.setTimeout(100)
.setRetryAttempts(0)
.setRetryInterval(20);
RedissonClient redisson = Redisson.create(config);
ExecutorService executorService = Executors.newFixedThreadPool(5);
AtomicInteger counter = new AtomicInteger(5*15);
AtomicBoolean hasErrors = new AtomicBoolean();
for (int i = 0; i < 5; i++) {
executorService.submit(() -> {
for (int j = 0 ; j < 15; j++) {
executeBatch(redisson, batchOptions).whenComplete((r, e) -> {
if (e != null) {
hasErrors.set(true);
}
counter.decrementAndGet();
});
}
});
}
ExecutorService executorService = Executors.newFixedThreadPool(5);
AtomicInteger counter = new AtomicInteger(5*15);
AtomicBoolean hasErrors = new AtomicBoolean();
for (int i = 0; i < 5; i++) {
executorService.submit(() -> {
for (int j = 0 ; j < 15; j++) {
executeBatch(redisson, batchOptions).whenComplete((r, e) -> {
if (e != null) {
hasErrors.set(true);
}
counter.decrementAndGet();
});
}
});
}
Awaitility.await().atMost(13, TimeUnit.SECONDS).until(() -> {
return counter.get() == 0;
});
Assertions.assertThat(hasErrors).isTrue();
Awaitility.await().atMost(13, TimeUnit.SECONDS).until(() -> {
return counter.get() == 0;
executeBatch(redisson, batchOptions).toCompletableFuture().join();
redisson.shutdown();
});
Assertions.assertThat(hasErrors).isTrue();
executeBatch(redisson, batchOptions).toCompletableFuture().join();
redisson.shutdown();
process.shutdown();
}
public RFuture<BatchResult<?>> executeBatch(RedissonClient client, BatchOptions batchOptions) {
@ -223,25 +204,25 @@ public class RedissonBatchTest extends BaseTest {
batch.execute();
assertThat(f1.get()).isEqualTo(1d);
assertThat(f2.get()).isNull();
RScoredSortedSet<String> set = redisson.getScoredSortedSet("myZKey");
assertThat(set.getScore("abc")).isEqualTo(1d);
RBucket<String> bucket = redisson.getBucket("test");
assertThat(bucket.get()).isEqualTo("1");
RBatch batch2 = redisson.createBatch(batchOptions);
RFuture<Double> b2f1 = batch2.getScoredSortedSet("myZKey2").addScoreAsync("abc", 1d);
RFuture<Double> b2f2 = batch2.getScoredSortedSet("myZKey2").addScoreAsync("abc", 1d);
batch2.execute();
assertThat(b2f1.get()).isEqualTo(1d);
assertThat(b2f2.get()).isEqualTo(2d);
}
@ParameterizedTest
@MethodSource("data")
@Timeout(20)
public void testPerformance() {
@Timeout(40)
public void testPerformance(BatchOptions batchOptions) {
RMap<String, String> map = redisson.getMap("map");
Map<String, String> m = new HashMap<String, String>();
for (int j = 0; j < 1000; j++) {
@ -250,7 +231,7 @@ public class RedissonBatchTest extends BaseTest {
map.putAll(m);
for (int i = 0; i < 10000; i++) {
RBatch rBatch = redisson.createBatch();
RBatch rBatch = redisson.createBatch(batchOptions);
RMapAsync<String, String> m1 = rBatch.getMap("map");
m1.getAllAsync(m.keySet());
try {
@ -304,7 +285,7 @@ public class RedissonBatchTest extends BaseTest {
.setConnectionMinimumIdleSize(1).setConnectionPoolSize(1);
RedissonClient redisson = Redisson.create(config);
BatchOptions batchOptions = BatchOptions.defaults().executionMode(ExecutionMode.REDIS_WRITE_ATOMIC);
RBatch batch1 = redisson.createBatch(batchOptions);
for (int i = 0; i < 150000; i++) {
@ -320,18 +301,18 @@ public class RedissonBatchTest extends BaseTest {
redisson.getBucket("test3").set(4);
assertThat(redisson.getBucket("test3").get()).isEqualTo(4);
RBatch batch = redisson.createBatch(batchOptions);
batch.getBucket("test1").setAsync(1);
batch.getBucket("test2").setAsync(2);
batch.execute();
assertThat(redisson.getBucket("test1").get()).isEqualTo(1);
assertThat(redisson.getBucket("test2").get()).isEqualTo(2);
redisson.shutdown();
}
@ParameterizedTest
@MethodSource("data")
public void testBigRequestAtomic(BatchOptions batchOptions) {
@ -340,13 +321,13 @@ public class RedissonBatchTest extends BaseTest {
.responseTimeout(15, TimeUnit.SECONDS)
.retryInterval(1, TimeUnit.SECONDS)
.retryAttempts(5);
RBatch batch = redisson.createBatch(batchOptions);
for (int i = 0; i < 100; i++) {
batch.getBucket("" + i).setAsync(i);
batch.getBucket("" + i).getAsync();
}
BatchResult<?> s = batch.execute();
assertThat(s.getResponses().size()).isEqualTo(200);
}
@ -420,43 +401,29 @@ public class RedissonBatchTest extends BaseTest {
@ParameterizedTest
@MethodSource("data")
@Timeout(value = 20, threadMode = Timeout.ThreadMode.SEPARATE_THREAD)
public void testSyncSlaves(BatchOptions batchOptions) throws FailedToStartRedisException, IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
public void testSyncSlaves(BatchOptions batchOptions) {
testInCluster(client -> {
Config config = client.getConfig();
config.useClusterServers()
.setTimeout(1000000)
.setRetryInterval(1000000);
RedissonClient redisson = Redisson.create(config);
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterProcesses process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setTimeout(1000000)
.setRetryInterval(1000000)
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
batchOptions
.sync(1, Duration.ofSeconds(1));
RBatch batch = redisson.createBatch(batchOptions);
for (int i = 0; i < 100; i++) {
RMapAsync<String, String> map = batch.getMap("test");
map.putAsync("" + i, "" + i);
}
batchOptions
.sync(1, Duration.ofSeconds(1));
BatchResult<?> result = batch.execute();
assertThat(result.getResponses()).hasSize(100);
assertThat(result.getSyncedSlaves()).isEqualTo(1);
process.shutdown();
redisson.shutdown();
RBatch batch = redisson.createBatch(batchOptions);
for (int i = 0; i < 100; i++) {
RMapAsync<String, String> map = batch.getMap("test");
map.putAsync("" + i, "" + i);
}
BatchResult<?> result = batch.execute();
assertThat(result.getResponses()).hasSize(100);
assertThat(result.getSyncedSlaves()).isEqualTo(1);
redisson.shutdown();
});
}
@ParameterizedTest
@ -475,7 +442,7 @@ public class RedissonBatchTest extends BaseTest {
((BatchPromise)f.toCompletableFuture()).getSentPromise().join();
}
}
long s = System.currentTimeMillis();
batch.execute();
long executionTime = System.currentTimeMillis() - s;
@ -487,14 +454,11 @@ public class RedissonBatchTest extends BaseTest {
assertThat(redisson.getMapCache("test").size()).isEqualTo(total);
redisson.shutdown();
}
@ParameterizedTest
@MethodSource("data")
public void testSkipResult(BatchOptions batchOptions) {
Assumptions.assumeTrue(RedisRunner.getDefaultRedisServerInstance().getRedisVersion().compareTo("3.2.0") > 0);
batchOptions
.skipResult();
batchOptions.skipResult();
RBatch batch = redisson.createBatch(batchOptions);
batch.getBucket("A1").setAsync("001");
@ -503,11 +467,11 @@ public class RedissonBatchTest extends BaseTest {
batch.getKeys().deleteAsync("A1");
batch.getKeys().deleteAsync("A2");
batch.execute();
assertThat(redisson.getBucket("A1").isExists()).isFalse();
assertThat(redisson.getBucket("A3").isExists()).isTrue();
}
@ParameterizedTest
@MethodSource("data")
public void testBatchNPE(BatchOptions batchOptions) {
@ -525,14 +489,14 @@ public class RedissonBatchTest extends BaseTest {
public void testAtomic(BatchOptions batchOptions) {
batchOptions
.executionMode(ExecutionMode.IN_MEMORY_ATOMIC);
RBatch batch = redisson.createBatch(batchOptions);
RFuture<Long> f1 = batch.getAtomicLong("A1").addAndGetAsync(1);
RFuture<Long> f2 = batch.getAtomicLong("A2").addAndGetAsync(2);
RFuture<Long> f3 = batch.getAtomicLong("A3").addAndGetAsync(3);
RFuture<Long> d1 = batch.getKeys().deleteAsync("A1", "A2");
BatchResult<?> f = batch.execute();
List<Object> list = (List<Object>) f.getResponses();
assertThat(list).containsExactly(1L, 2L, 3L, 2L);
assertThat(f1.toCompletableFuture().getNow(null)).isEqualTo(1);
@ -540,52 +504,37 @@ public class RedissonBatchTest extends BaseTest {
assertThat(f3.toCompletableFuture().getNow(null)).isEqualTo(3);
assertThat(d1.toCompletableFuture().getNow(null)).isEqualTo(2);
}
@ParameterizedTest
@MethodSource("data")
public void testAtomicSyncSlaves(BatchOptions batchOptions) throws FailedToStartRedisException, IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
@ParameterizedTest
@MethodSource("data")
public void testAtomicSyncSlaves(BatchOptions batchOptions) {
testInCluster(client -> {
Config config = client.getConfig();
config.useClusterServers()
.setTimeout(123000);
RedissonClient redisson = Redisson.create(config);
batchOptions
.executionMode(ExecutionMode.IN_MEMORY_ATOMIC)
.sync(1, Duration.ofSeconds(1));
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterProcesses process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setTimeout(123000)
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
batchOptions
.executionMode(ExecutionMode.IN_MEMORY_ATOMIC)
.sync(1, Duration.ofSeconds(1));
RBatch batch = redisson.createBatch(batchOptions);
for (int i = 0; i < 10; i++) {
batch.getAtomicLong("{test}" + i).addAndGetAsync(i);
}
RBatch batch = redisson.createBatch(batchOptions);
for (int i = 0; i < 10; i++) {
batch.getAtomicLong("{test}" + i).addAndGetAsync(i);
}
BatchResult<?> result = batch.execute();
assertThat(result.getSyncedSlaves()).isEqualTo(1);
int i = 0;
for (Object res : result.getResponses()) {
assertThat((Long)res).isEqualTo(i++);
}
BatchResult<?> result = batch.execute();
assertThat(result.getSyncedSlaves()).isEqualTo(1);
int i = 0;
for (Object res : result.getResponses()) {
assertThat((Long)res).isEqualTo(i++);
}
process.shutdown();
redisson.shutdown();
redisson.shutdown();
});
}
@ParameterizedTest
@MethodSource("data")
public void testDifferentCodecs(BatchOptions batchOptions) {
@ -613,7 +562,7 @@ public class RedissonBatchTest extends BaseTest {
org.junit.jupiter.api.Assertions.assertEquals("2", val1.toCompletableFuture().getNow(null));
org.junit.jupiter.api.Assertions.assertEquals("3", val2.toCompletableFuture().getNow(null));
}
@ParameterizedTest
@MethodSource("data")
public void testBatchList(BatchOptions batchOptions) {
@ -630,17 +579,17 @@ public class RedissonBatchTest extends BaseTest {
@MethodSource("data")
public void testBatchCancel() {
RedissonClient redisson = createInstance();
BatchOptions batchOptions = BatchOptions.defaults().executionMode(ExecutionMode.IN_MEMORY);
RBatch batch = redisson.createBatch(batchOptions);
for (int i = 0; i < 10; i++) {
RFuture<Void> f = batch.getBucket("test").setAsync(123);
assertThat(f.cancel(true)).isTrue();
}
BatchResult<?> res = batch.execute();
org.junit.jupiter.api.Assertions.assertEquals(0, res.getResponses().size());
RBatch b2 = redisson.createBatch(batchOptions);
RListAsync<Integer> listAsync2 = b2.getList("list");
for (int i = 0; i < 6; i++) {
@ -651,7 +600,7 @@ public class RedissonBatchTest extends BaseTest {
RFuture<BatchResult<?>> res2 = b2.executeAsync();
assertThat(res2.cancel(true)).isFalse();
org.junit.jupiter.api.Assertions.assertEquals(0, res.getResponses().size());
redisson.shutdown();
}
@ -702,7 +651,7 @@ public class RedissonBatchTest extends BaseTest {
}
BatchResult<?> res = batch.execute();
org.junit.jupiter.api.Assertions.assertEquals(210*5, res.getResponses().size());
redisson.shutdown();
}
@ -733,7 +682,7 @@ public class RedissonBatchTest extends BaseTest {
RBatch batch = redisson.createBatch(batchOptions);
batch.execute();
}
@ParameterizedTest
@MethodSource("data")
@ -762,7 +711,7 @@ public class RedissonBatchTest extends BaseTest {
e.shutdown();
org.junit.jupiter.api.Assertions.assertTrue(e.awaitTermination(30, TimeUnit.SECONDS));
BatchResult<?> s = batch.execute();
int i = 0;
for (Object element : s.getResponses()) {
RFuture<Long> a = futures.get(i);

@ -26,7 +26,7 @@ import static org.assertj.core.api.Assertions.assertThat;
public class RedissonBlockingQueueReactiveTest extends BaseReactiveTest {
@Test
public void testTakeElements2() throws InterruptedException {
public void testTakeElements2() {
RBlockingDequeReactive<Long> queue = redisson.getBlockingDeque("test");
Mono<Void> mono = Flux.range(1, 100)
@ -44,7 +44,7 @@ public class RedissonBlockingQueueReactiveTest extends BaseReactiveTest {
.repeat()
.subscribe();
Awaitility.await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> {
Awaitility.await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> {
assertThat(counter.get()).isEqualTo(100);
});
}

@ -1,47 +1,25 @@
package org.redisson.rx;
import java.io.IOException;
import java.util.Iterator;
import org.junit.jupiter.api.AfterAll;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Single;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.redisson.BaseTest;
import org.redisson.RedisRunner;
import org.redisson.Redisson;
import org.redisson.RedisDockerTest;
import org.redisson.api.RCollectionRx;
import org.redisson.api.RScoredSortedSetRx;
import org.redisson.api.RedissonRxClient;
import org.redisson.config.Config;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Single;
import java.io.IOException;
import java.util.Iterator;
public abstract class BaseRxTest {
public abstract class BaseRxTest extends RedisDockerTest {
protected RedissonRxClient redisson;
protected static RedissonRxClient defaultRedisson;
protected static RedissonRxClient redisson;
@BeforeAll
public static void beforeClass() throws IOException, InterruptedException {
RedisRunner.startDefaultRedisServerInstance();
defaultRedisson = createInstance();
}
@AfterAll
public static void afterClass() throws IOException, InterruptedException {
defaultRedisson.shutdown();
RedisRunner.shutDownDefaultRedisServerInstance();
}
@BeforeEach
public void before() throws IOException, InterruptedException {
if (redisson == null) {
redisson = defaultRedisson;
}
sync(redisson.getKeys().flushall());
redisson = RedisDockerTest.redisson.rxJava();
}
public static <V> V sync(Maybe<V> maybe) {
@ -68,9 +46,4 @@ public abstract class BaseRxTest {
return flowable.toList().blockingGet().iterator();
}
public static RedissonRxClient createInstance() {
Config config = BaseTest.createConfig();
return Redisson.create(config).rxJava();
}
}

@ -4,15 +4,10 @@ import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Single;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.redisson.BaseTest;
import org.redisson.ClusterRunner;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.Redisson;
import org.redisson.api.*;
@ -87,7 +82,7 @@ public class RedissonBatchRxTest extends BaseRxTest {
@ParameterizedTest
@MethodSource("data")
@Timeout(21)
@Timeout(40)
public void testPerformance(BatchOptions batchOptions) {
RMapRx<String, String> map = redisson.getMap("map");
Map<String, String> m = new HashMap<String, String>();
@ -108,7 +103,7 @@ public class RedissonBatchRxTest extends BaseRxTest {
@Test
@Timeout(20)
public void testConnectionLeakAfterError() {
Config config = BaseTest.createConfig();
Config config = createConfig();
config.useSingleServer()
.setRetryInterval(100)
.setTimeout(200)
@ -164,49 +159,35 @@ public class RedissonBatchRxTest extends BaseRxTest {
@ParameterizedTest
@MethodSource("data")
public void testSyncSlaves(BatchOptions batchOptions) throws FailedToStartRedisException, IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
public void testSyncSlaves(BatchOptions batchOptions) throws FailedToStartRedisException {
testInCluster(client -> {
Config config = client.getConfig();
config.useClusterServers()
.setTimeout(1000000)
.setRetryInterval(1000);
RedissonRxClient redisson = Redisson.create(config).rxJava();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterProcesses process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setTimeout(1000000)
.setRetryInterval(1000)
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonRxClient redisson = Redisson.create(config).rxJava();
batchOptions
.syncSlaves(1, 1, TimeUnit.SECONDS);
RBatchRx batch = redisson.createBatch(batchOptions);
for (int i = 0; i < 100; i++) {
RMapRx<String, String> map = batch.getMap("test");
map.put("" + i, "" + i);
}
batchOptions
.syncSlaves(1, 1, TimeUnit.SECONDS);
BatchResult<?> result = sync(batch.execute());
assertThat(result.getResponses()).hasSize(100);
assertThat(result.getSyncedSlaves()).isEqualTo(1);
process.shutdown();
redisson.shutdown();
RBatchRx batch = redisson.createBatch(batchOptions);
for (int i = 0; i < 100; i++) {
RMapRx<String, String> map = batch.getMap("test");
map.put("" + i, "" + i);
}
BatchResult<?> result = sync(batch.execute());
assertThat(result.getResponses()).hasSize(100);
assertThat(result.getSyncedSlaves()).isEqualTo(1);
redisson.shutdown();
});
}
@ParameterizedTest
@MethodSource("data")
public void testWriteTimeout(BatchOptions batchOptions) throws InterruptedException {
Config config = BaseTest.createConfig();
Config config = createConfig();
config.useSingleServer().setRetryInterval(700).setTimeout(1500);
RedissonRxClient redisson = Redisson.create(config).rxJava();
@ -237,10 +218,7 @@ public class RedissonBatchRxTest extends BaseRxTest {
@ParameterizedTest
@MethodSource("data")
public void testSkipResult(BatchOptions batchOptions) {
Assumptions.assumeTrue(RedisRunner.getDefaultRedisServerInstance().getRedisVersion().compareTo("3.2.0") > 0);
batchOptions
.skipResult();
batchOptions.skipResult();
RBatchRx batch = redisson.createBatch(batchOptions);
batch.getBucket("A1").set("001");
@ -289,44 +267,29 @@ public class RedissonBatchRxTest extends BaseRxTest {
@ParameterizedTest
@MethodSource("data")
public void testAtomicSyncSlaves(BatchOptions batchOptions) throws FailedToStartRedisException, IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
testInCluster(client -> {
Config config = client.getConfig();
config.useClusterServers()
.setTimeout(123000);
RedissonRxClient redisson = Redisson.create(config).rxJava();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterProcesses process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setTimeout(123000)
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonRxClient redisson = Redisson.create(config).rxJava();
batchOptions
.executionMode(ExecutionMode.IN_MEMORY_ATOMIC)
.syncSlaves(1, 1, TimeUnit.SECONDS);
batchOptions.executionMode(ExecutionMode.IN_MEMORY_ATOMIC)
.syncSlaves(1, 1, TimeUnit.SECONDS);
RBatchRx batch = redisson.createBatch(batchOptions);
for (int i = 0; i < 10; i++) {
batch.getAtomicLong("{test}" + i).addAndGet(i);
}
RBatchRx batch = redisson.createBatch(batchOptions);
for (int i = 0; i < 10; i++) {
batch.getAtomicLong("{test}" + i).addAndGet(i);
}
BatchResult<?> result = sync(batch.execute());
assertThat(result.getSyncedSlaves()).isEqualTo(1);
int i = 0;
for (Object res : result.getResponses()) {
assertThat((Long)res).isEqualTo(i++);
}
process.shutdown();
redisson.shutdown();
BatchResult<?> result = sync(batch.execute());
assertThat(result.getSyncedSlaves()).isEqualTo(1);
int i = 0;
for (Object res : result.getResponses()) {
assertThat((Long)res).isEqualTo(i++);
}
redisson.shutdown();
});
}
@ -373,7 +336,7 @@ public class RedissonBatchRxTest extends BaseRxTest {
@ParameterizedTest
@MethodSource("data")
public void testBatchBigRequest(BatchOptions batchOptions) {
Config config = BaseTest.createConfig();
Config config = createConfig();
config.useSingleServer().setTimeout(15000);
RedissonRxClient redisson = Redisson.create(config).rxJava();

@ -1,7 +1,10 @@
package org.redisson.rx;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.api.RBlockingQueueRx;
import java.util.ArrayList;
import java.util.HashSet;
@ -12,16 +15,13 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.api.RBlockingQueueRx;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class RedissonBlockingQueueRxTest extends BaseRxTest {
@Test
public void testTakeElements() {
public void testTakeElements() throws InterruptedException {
RBlockingQueueRx<Integer> queue = redisson.getBlockingQueue("test");
List<Integer> elements = new ArrayList<>();
queue.takeElements().subscribe(new Subscriber<Integer>() {
@ -48,7 +48,9 @@ public class RedissonBlockingQueueRxTest extends BaseRxTest {
for (int i = 0; i < 10; i++) {
sync(queue.add(i));
}
Thread.sleep(500);
assertThat(elements).containsExactly(0, 1, 2, 3);
}

@ -1,29 +1,22 @@
package org.redisson.rx;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.redisson.api.ExpiredObjectListener;
import org.redisson.api.RMapCacheRx;
import org.redisson.api.RMapReactive;
import org.redisson.api.RMapRx;
import org.redisson.api.map.event.EntryEvent;
import org.redisson.api.map.event.EntryExpiredListener;
import org.redisson.codec.MsgPackJacksonCodec;
import java.io.Serializable;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonMapCacheRxTest extends BaseRxTest {
public static class SimpleKey implements Serializable {
@ -194,10 +187,10 @@ public class RedissonMapCacheRxTest extends BaseRxTest {
}
}).blockingGet();
Thread.sleep(5100);
assertThat(received).isTrue();
assertThat(cache.size().blockingGet()).isZero();
Awaitility.await().atMost(Duration.ofSeconds(6)).untilAsserted(() -> {
assertThat(received).isTrue();
assertThat(cache.size().blockingGet()).isZero();
});
}
@Test

Loading…
Cancel
Save