diff --git a/redisson/src/test/java/org/redisson/RedissonBatchTest.java b/redisson/src/test/java/org/redisson/RedissonBatchTest.java index 6d12a85fc..6cc4d7e1a 100644 --- a/redisson/src/test/java/org/redisson/RedissonBatchTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBatchTest.java @@ -8,8 +8,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; -import org.redisson.ClusterRunner.ClusterProcesses; -import org.redisson.RedisRunner.FailedToStartRedisException; import org.redisson.api.*; import org.redisson.api.BatchOptions.ExecutionMode; import org.redisson.client.RedisClient; @@ -26,7 +24,6 @@ import org.redisson.misc.RedisURI; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy; -import java.io.IOException; import java.time.Duration; import java.util.*; import java.util.concurrent.*; @@ -37,7 +34,7 @@ import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; -public class RedissonBatchTest extends BaseTest { +public class RedissonBatchTest extends RedisDockerTest { public static Iterable data() { return Arrays.asList(new Object[][] { @@ -146,62 +143,46 @@ public class RedissonBatchTest extends BaseTest { @ParameterizedTest @MethodSource("data") - public void testConnectionLeak(BatchOptions batchOptions) throws Exception { + public void testConnectionLeak(BatchOptions batchOptions) { Assumptions.assumeTrue(batchOptions.getExecutionMode() == ExecutionMode.IN_MEMORY); - RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave(); - RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave(); - RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave(); - RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave(); - RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave(); - RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave(); - - ClusterRunner clusterRunner = new ClusterRunner() - .addNode(master1, slave1) - .addNode(master2, slave2) - .addNode(master3, slave3); - ClusterRunner.ClusterProcesses process = clusterRunner.run(); - - Thread.sleep(1000); - - Config config = new Config(); - config.useClusterServers() - .setConnectTimeout(500).setPingConnectionInterval(2000) - .setMasterConnectionMinimumIdleSize(1) - .setMasterConnectionPoolSize(1) - .setSlaveConnectionMinimumIdleSize(1) - .setSlaveConnectionPoolSize(1) - .setTimeout(100) - .setRetryAttempts(0) - .setRetryInterval(20) - .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); - RedissonClient redisson = Redisson.create(config); + testInCluster(client -> { + Config config = client.getConfig(); + config.useClusterServers() + .setConnectTimeout(500).setPingConnectionInterval(2000) + .setMasterConnectionMinimumIdleSize(1) + .setMasterConnectionPoolSize(1) + .setSlaveConnectionMinimumIdleSize(1) + .setSlaveConnectionPoolSize(1) + .setTimeout(100) + .setRetryAttempts(0) + .setRetryInterval(20); + RedissonClient redisson = Redisson.create(config); + + ExecutorService executorService = Executors.newFixedThreadPool(5); + AtomicInteger counter = new AtomicInteger(5*15); + AtomicBoolean hasErrors = new AtomicBoolean(); + for (int i = 0; i < 5; i++) { + executorService.submit(() -> { + for (int j = 0 ; j < 15; j++) { + executeBatch(redisson, batchOptions).whenComplete((r, e) -> { + if (e != null) { + hasErrors.set(true); + } + counter.decrementAndGet(); + }); + } + }); + } - ExecutorService executorService = Executors.newFixedThreadPool(5); - AtomicInteger counter = new AtomicInteger(5*15); - AtomicBoolean hasErrors = new AtomicBoolean(); - for (int i = 0; i < 5; i++) { - executorService.submit(() -> { - for (int j = 0 ; j < 15; j++) { - executeBatch(redisson, batchOptions).whenComplete((r, e) -> { - if (e != null) { - hasErrors.set(true); - } - counter.decrementAndGet(); - }); - } - }); - } + Awaitility.await().atMost(13, TimeUnit.SECONDS).until(() -> { + return counter.get() == 0; + }); + Assertions.assertThat(hasErrors).isTrue(); - Awaitility.await().atMost(13, TimeUnit.SECONDS).until(() -> { - return counter.get() == 0; + executeBatch(redisson, batchOptions).toCompletableFuture().join(); + redisson.shutdown(); }); - Assertions.assertThat(hasErrors).isTrue(); - - executeBatch(redisson, batchOptions).toCompletableFuture().join(); - - redisson.shutdown(); - process.shutdown(); } public RFuture> executeBatch(RedissonClient client, BatchOptions batchOptions) { @@ -223,25 +204,25 @@ public class RedissonBatchTest extends BaseTest { batch.execute(); assertThat(f1.get()).isEqualTo(1d); assertThat(f2.get()).isNull(); - + RScoredSortedSet set = redisson.getScoredSortedSet("myZKey"); assertThat(set.getScore("abc")).isEqualTo(1d); RBucket bucket = redisson.getBucket("test"); assertThat(bucket.get()).isEqualTo("1"); - + RBatch batch2 = redisson.createBatch(batchOptions); RFuture b2f1 = batch2.getScoredSortedSet("myZKey2").addScoreAsync("abc", 1d); RFuture b2f2 = batch2.getScoredSortedSet("myZKey2").addScoreAsync("abc", 1d); batch2.execute(); - + assertThat(b2f1.get()).isEqualTo(1d); assertThat(b2f2.get()).isEqualTo(2d); } - + @ParameterizedTest @MethodSource("data") - @Timeout(20) - public void testPerformance() { + @Timeout(40) + public void testPerformance(BatchOptions batchOptions) { RMap map = redisson.getMap("map"); Map m = new HashMap(); for (int j = 0; j < 1000; j++) { @@ -250,7 +231,7 @@ public class RedissonBatchTest extends BaseTest { map.putAll(m); for (int i = 0; i < 10000; i++) { - RBatch rBatch = redisson.createBatch(); + RBatch rBatch = redisson.createBatch(batchOptions); RMapAsync m1 = rBatch.getMap("map"); m1.getAllAsync(m.keySet()); try { @@ -304,7 +285,7 @@ public class RedissonBatchTest extends BaseTest { .setConnectionMinimumIdleSize(1).setConnectionPoolSize(1); RedissonClient redisson = Redisson.create(config); - + BatchOptions batchOptions = BatchOptions.defaults().executionMode(ExecutionMode.REDIS_WRITE_ATOMIC); RBatch batch1 = redisson.createBatch(batchOptions); for (int i = 0; i < 150000; i++) { @@ -320,18 +301,18 @@ public class RedissonBatchTest extends BaseTest { redisson.getBucket("test3").set(4); assertThat(redisson.getBucket("test3").get()).isEqualTo(4); - + RBatch batch = redisson.createBatch(batchOptions); batch.getBucket("test1").setAsync(1); batch.getBucket("test2").setAsync(2); batch.execute(); - + assertThat(redisson.getBucket("test1").get()).isEqualTo(1); assertThat(redisson.getBucket("test2").get()).isEqualTo(2); - + redisson.shutdown(); } - + @ParameterizedTest @MethodSource("data") public void testBigRequestAtomic(BatchOptions batchOptions) { @@ -340,13 +321,13 @@ public class RedissonBatchTest extends BaseTest { .responseTimeout(15, TimeUnit.SECONDS) .retryInterval(1, TimeUnit.SECONDS) .retryAttempts(5); - + RBatch batch = redisson.createBatch(batchOptions); for (int i = 0; i < 100; i++) { batch.getBucket("" + i).setAsync(i); batch.getBucket("" + i).getAsync(); } - + BatchResult s = batch.execute(); assertThat(s.getResponses().size()).isEqualTo(200); } @@ -420,43 +401,29 @@ public class RedissonBatchTest extends BaseTest { @ParameterizedTest @MethodSource("data") @Timeout(value = 20, threadMode = Timeout.ThreadMode.SEPARATE_THREAD) - public void testSyncSlaves(BatchOptions batchOptions) throws FailedToStartRedisException, IOException, InterruptedException { - RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave(); + public void testSyncSlaves(BatchOptions batchOptions) { + testInCluster(client -> { + Config config = client.getConfig(); + config.useClusterServers() + .setTimeout(1000000) + .setRetryInterval(1000000); + RedissonClient redisson = Redisson.create(config); - - ClusterRunner clusterRunner = new ClusterRunner() - .addNode(master1, slave1) - .addNode(master2, slave2) - .addNode(master3, slave3); - ClusterProcesses process = clusterRunner.run(); - - Config config = new Config(); - config.useClusterServers() - .setTimeout(1000000) - .setRetryInterval(1000000) - .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); - RedissonClient redisson = Redisson.create(config); - - batchOptions - .sync(1, Duration.ofSeconds(1)); - - RBatch batch = redisson.createBatch(batchOptions); - for (int i = 0; i < 100; i++) { - RMapAsync map = batch.getMap("test"); - map.putAsync("" + i, "" + i); - } + batchOptions + .sync(1, Duration.ofSeconds(1)); - BatchResult result = batch.execute(); - assertThat(result.getResponses()).hasSize(100); - assertThat(result.getSyncedSlaves()).isEqualTo(1); - - process.shutdown(); - redisson.shutdown(); + RBatch batch = redisson.createBatch(batchOptions); + for (int i = 0; i < 100; i++) { + RMapAsync map = batch.getMap("test"); + map.putAsync("" + i, "" + i); + } + + BatchResult result = batch.execute(); + assertThat(result.getResponses()).hasSize(100); + assertThat(result.getSyncedSlaves()).isEqualTo(1); + + redisson.shutdown(); + }); } @ParameterizedTest @@ -475,7 +442,7 @@ public class RedissonBatchTest extends BaseTest { ((BatchPromise)f.toCompletableFuture()).getSentPromise().join(); } } - + long s = System.currentTimeMillis(); batch.execute(); long executionTime = System.currentTimeMillis() - s; @@ -487,14 +454,11 @@ public class RedissonBatchTest extends BaseTest { assertThat(redisson.getMapCache("test").size()).isEqualTo(total); redisson.shutdown(); } - + @ParameterizedTest @MethodSource("data") public void testSkipResult(BatchOptions batchOptions) { - Assumptions.assumeTrue(RedisRunner.getDefaultRedisServerInstance().getRedisVersion().compareTo("3.2.0") > 0); - - batchOptions - .skipResult(); + batchOptions.skipResult(); RBatch batch = redisson.createBatch(batchOptions); batch.getBucket("A1").setAsync("001"); @@ -503,11 +467,11 @@ public class RedissonBatchTest extends BaseTest { batch.getKeys().deleteAsync("A1"); batch.getKeys().deleteAsync("A2"); batch.execute(); - + assertThat(redisson.getBucket("A1").isExists()).isFalse(); assertThat(redisson.getBucket("A3").isExists()).isTrue(); } - + @ParameterizedTest @MethodSource("data") public void testBatchNPE(BatchOptions batchOptions) { @@ -525,14 +489,14 @@ public class RedissonBatchTest extends BaseTest { public void testAtomic(BatchOptions batchOptions) { batchOptions .executionMode(ExecutionMode.IN_MEMORY_ATOMIC); - + RBatch batch = redisson.createBatch(batchOptions); RFuture f1 = batch.getAtomicLong("A1").addAndGetAsync(1); RFuture f2 = batch.getAtomicLong("A2").addAndGetAsync(2); RFuture f3 = batch.getAtomicLong("A3").addAndGetAsync(3); RFuture d1 = batch.getKeys().deleteAsync("A1", "A2"); BatchResult f = batch.execute(); - + List list = (List) f.getResponses(); assertThat(list).containsExactly(1L, 2L, 3L, 2L); assertThat(f1.toCompletableFuture().getNow(null)).isEqualTo(1); @@ -540,52 +504,37 @@ public class RedissonBatchTest extends BaseTest { assertThat(f3.toCompletableFuture().getNow(null)).isEqualTo(3); assertThat(d1.toCompletableFuture().getNow(null)).isEqualTo(2); } - - @ParameterizedTest -@MethodSource("data") - public void testAtomicSyncSlaves(BatchOptions batchOptions) throws FailedToStartRedisException, IOException, InterruptedException { - RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave(); + @ParameterizedTest + @MethodSource("data") + public void testAtomicSyncSlaves(BatchOptions batchOptions) { + testInCluster(client -> { + Config config = client.getConfig(); + config.useClusterServers() + .setTimeout(123000); + RedissonClient redisson = Redisson.create(config); + + batchOptions + .executionMode(ExecutionMode.IN_MEMORY_ATOMIC) + .sync(1, Duration.ofSeconds(1)); - - ClusterRunner clusterRunner = new ClusterRunner() - .addNode(master1, slave1) - .addNode(master2, slave2) - .addNode(master3, slave3); - ClusterProcesses process = clusterRunner.run(); - - Config config = new Config(); - config.useClusterServers() - .setTimeout(123000) - .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); - RedissonClient redisson = Redisson.create(config); - - batchOptions - .executionMode(ExecutionMode.IN_MEMORY_ATOMIC) - .sync(1, Duration.ofSeconds(1)); + RBatch batch = redisson.createBatch(batchOptions); + for (int i = 0; i < 10; i++) { + batch.getAtomicLong("{test}" + i).addAndGetAsync(i); + } - RBatch batch = redisson.createBatch(batchOptions); - for (int i = 0; i < 10; i++) { - batch.getAtomicLong("{test}" + i).addAndGetAsync(i); - } + BatchResult result = batch.execute(); + assertThat(result.getSyncedSlaves()).isEqualTo(1); + int i = 0; + for (Object res : result.getResponses()) { + assertThat((Long)res).isEqualTo(i++); + } - BatchResult result = batch.execute(); - assertThat(result.getSyncedSlaves()).isEqualTo(1); - int i = 0; - for (Object res : result.getResponses()) { - assertThat((Long)res).isEqualTo(i++); - } - - process.shutdown(); - redisson.shutdown(); + redisson.shutdown(); + }); } - + @ParameterizedTest @MethodSource("data") public void testDifferentCodecs(BatchOptions batchOptions) { @@ -613,7 +562,7 @@ public class RedissonBatchTest extends BaseTest { org.junit.jupiter.api.Assertions.assertEquals("2", val1.toCompletableFuture().getNow(null)); org.junit.jupiter.api.Assertions.assertEquals("3", val2.toCompletableFuture().getNow(null)); } - + @ParameterizedTest @MethodSource("data") public void testBatchList(BatchOptions batchOptions) { @@ -630,17 +579,17 @@ public class RedissonBatchTest extends BaseTest { @MethodSource("data") public void testBatchCancel() { RedissonClient redisson = createInstance(); - + BatchOptions batchOptions = BatchOptions.defaults().executionMode(ExecutionMode.IN_MEMORY); RBatch batch = redisson.createBatch(batchOptions); for (int i = 0; i < 10; i++) { RFuture f = batch.getBucket("test").setAsync(123); assertThat(f.cancel(true)).isTrue(); } - + BatchResult res = batch.execute(); org.junit.jupiter.api.Assertions.assertEquals(0, res.getResponses().size()); - + RBatch b2 = redisson.createBatch(batchOptions); RListAsync listAsync2 = b2.getList("list"); for (int i = 0; i < 6; i++) { @@ -651,7 +600,7 @@ public class RedissonBatchTest extends BaseTest { RFuture> res2 = b2.executeAsync(); assertThat(res2.cancel(true)).isFalse(); org.junit.jupiter.api.Assertions.assertEquals(0, res.getResponses().size()); - + redisson.shutdown(); } @@ -702,7 +651,7 @@ public class RedissonBatchTest extends BaseTest { } BatchResult res = batch.execute(); org.junit.jupiter.api.Assertions.assertEquals(210*5, res.getResponses().size()); - + redisson.shutdown(); } @@ -733,7 +682,7 @@ public class RedissonBatchTest extends BaseTest { RBatch batch = redisson.createBatch(batchOptions); batch.execute(); } - + @ParameterizedTest @MethodSource("data") @@ -762,7 +711,7 @@ public class RedissonBatchTest extends BaseTest { e.shutdown(); org.junit.jupiter.api.Assertions.assertTrue(e.awaitTermination(30, TimeUnit.SECONDS)); BatchResult s = batch.execute(); - + int i = 0; for (Object element : s.getResponses()) { RFuture a = futures.get(i); diff --git a/redisson/src/test/java/org/redisson/RedissonBlockingQueueReactiveTest.java b/redisson/src/test/java/org/redisson/RedissonBlockingQueueReactiveTest.java index bdfb28a30..190137a1c 100644 --- a/redisson/src/test/java/org/redisson/RedissonBlockingQueueReactiveTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBlockingQueueReactiveTest.java @@ -26,7 +26,7 @@ import static org.assertj.core.api.Assertions.assertThat; public class RedissonBlockingQueueReactiveTest extends BaseReactiveTest { @Test - public void testTakeElements2() throws InterruptedException { + public void testTakeElements2() { RBlockingDequeReactive queue = redisson.getBlockingDeque("test"); Mono mono = Flux.range(1, 100) @@ -44,7 +44,7 @@ public class RedissonBlockingQueueReactiveTest extends BaseReactiveTest { .repeat() .subscribe(); - Awaitility.await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> { + Awaitility.await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> { assertThat(counter.get()).isEqualTo(100); }); } diff --git a/redisson/src/test/java/org/redisson/rx/BaseRxTest.java b/redisson/src/test/java/org/redisson/rx/BaseRxTest.java index cbef061b9..cf97f02eb 100644 --- a/redisson/src/test/java/org/redisson/rx/BaseRxTest.java +++ b/redisson/src/test/java/org/redisson/rx/BaseRxTest.java @@ -1,47 +1,25 @@ package org.redisson.rx; -import java.io.IOException; -import java.util.Iterator; - -import org.junit.jupiter.api.AfterAll; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.core.Single; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; -import org.redisson.BaseTest; -import org.redisson.RedisRunner; -import org.redisson.Redisson; +import org.redisson.RedisDockerTest; import org.redisson.api.RCollectionRx; import org.redisson.api.RScoredSortedSetRx; import org.redisson.api.RedissonRxClient; -import org.redisson.config.Config; -import io.reactivex.rxjava3.core.Completable; -import io.reactivex.rxjava3.core.Flowable; -import io.reactivex.rxjava3.core.Maybe; -import io.reactivex.rxjava3.core.Single; +import java.io.IOException; +import java.util.Iterator; -public abstract class BaseRxTest { +public abstract class BaseRxTest extends RedisDockerTest { - protected RedissonRxClient redisson; - protected static RedissonRxClient defaultRedisson; + protected static RedissonRxClient redisson; @BeforeAll public static void beforeClass() throws IOException, InterruptedException { - RedisRunner.startDefaultRedisServerInstance(); - defaultRedisson = createInstance(); - } - - @AfterAll - public static void afterClass() throws IOException, InterruptedException { - defaultRedisson.shutdown(); - RedisRunner.shutDownDefaultRedisServerInstance(); - } - - @BeforeEach - public void before() throws IOException, InterruptedException { - if (redisson == null) { - redisson = defaultRedisson; - } - sync(redisson.getKeys().flushall()); + redisson = RedisDockerTest.redisson.rxJava(); } public static V sync(Maybe maybe) { @@ -68,9 +46,4 @@ public abstract class BaseRxTest { return flowable.toList().blockingGet().iterator(); } - public static RedissonRxClient createInstance() { - Config config = BaseTest.createConfig(); - return Redisson.create(config).rxJava(); - } - } diff --git a/redisson/src/test/java/org/redisson/rx/RedissonBatchRxTest.java b/redisson/src/test/java/org/redisson/rx/RedissonBatchRxTest.java index afdd18ae7..92e119fb3 100644 --- a/redisson/src/test/java/org/redisson/rx/RedissonBatchRxTest.java +++ b/redisson/src/test/java/org/redisson/rx/RedissonBatchRxTest.java @@ -4,15 +4,10 @@ import io.reactivex.rxjava3.core.Completable; import io.reactivex.rxjava3.core.Maybe; import io.reactivex.rxjava3.core.Single; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; -import org.redisson.BaseTest; -import org.redisson.ClusterRunner; -import org.redisson.ClusterRunner.ClusterProcesses; -import org.redisson.RedisRunner; import org.redisson.RedisRunner.FailedToStartRedisException; import org.redisson.Redisson; import org.redisson.api.*; @@ -87,7 +82,7 @@ public class RedissonBatchRxTest extends BaseRxTest { @ParameterizedTest @MethodSource("data") - @Timeout(21) + @Timeout(40) public void testPerformance(BatchOptions batchOptions) { RMapRx map = redisson.getMap("map"); Map m = new HashMap(); @@ -108,7 +103,7 @@ public class RedissonBatchRxTest extends BaseRxTest { @Test @Timeout(20) public void testConnectionLeakAfterError() { - Config config = BaseTest.createConfig(); + Config config = createConfig(); config.useSingleServer() .setRetryInterval(100) .setTimeout(200) @@ -164,49 +159,35 @@ public class RedissonBatchRxTest extends BaseRxTest { @ParameterizedTest @MethodSource("data") - public void testSyncSlaves(BatchOptions batchOptions) throws FailedToStartRedisException, IOException, InterruptedException { - RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave(); + public void testSyncSlaves(BatchOptions batchOptions) throws FailedToStartRedisException { + testInCluster(client -> { + Config config = client.getConfig(); + config.useClusterServers() + .setTimeout(1000000) + .setRetryInterval(1000); + RedissonRxClient redisson = Redisson.create(config).rxJava(); - - ClusterRunner clusterRunner = new ClusterRunner() - .addNode(master1, slave1) - .addNode(master2, slave2) - .addNode(master3, slave3); - ClusterProcesses process = clusterRunner.run(); - - Config config = new Config(); - config.useClusterServers() - .setTimeout(1000000) - .setRetryInterval(1000) - .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); - RedissonRxClient redisson = Redisson.create(config).rxJava(); - - batchOptions - .syncSlaves(1, 1, TimeUnit.SECONDS); - - RBatchRx batch = redisson.createBatch(batchOptions); - for (int i = 0; i < 100; i++) { - RMapRx map = batch.getMap("test"); - map.put("" + i, "" + i); - } + batchOptions + .syncSlaves(1, 1, TimeUnit.SECONDS); - BatchResult result = sync(batch.execute()); - assertThat(result.getResponses()).hasSize(100); - assertThat(result.getSyncedSlaves()).isEqualTo(1); - - process.shutdown(); - redisson.shutdown(); + RBatchRx batch = redisson.createBatch(batchOptions); + for (int i = 0; i < 100; i++) { + RMapRx map = batch.getMap("test"); + map.put("" + i, "" + i); + } + + BatchResult result = sync(batch.execute()); + assertThat(result.getResponses()).hasSize(100); + assertThat(result.getSyncedSlaves()).isEqualTo(1); + + redisson.shutdown(); + }); } @ParameterizedTest @MethodSource("data") public void testWriteTimeout(BatchOptions batchOptions) throws InterruptedException { - Config config = BaseTest.createConfig(); + Config config = createConfig(); config.useSingleServer().setRetryInterval(700).setTimeout(1500); RedissonRxClient redisson = Redisson.create(config).rxJava(); @@ -237,10 +218,7 @@ public class RedissonBatchRxTest extends BaseRxTest { @ParameterizedTest @MethodSource("data") public void testSkipResult(BatchOptions batchOptions) { - Assumptions.assumeTrue(RedisRunner.getDefaultRedisServerInstance().getRedisVersion().compareTo("3.2.0") > 0); - - batchOptions - .skipResult(); + batchOptions.skipResult(); RBatchRx batch = redisson.createBatch(batchOptions); batch.getBucket("A1").set("001"); @@ -289,44 +267,29 @@ public class RedissonBatchRxTest extends BaseRxTest { @ParameterizedTest @MethodSource("data") public void testAtomicSyncSlaves(BatchOptions batchOptions) throws FailedToStartRedisException, IOException, InterruptedException { - RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave(); + testInCluster(client -> { + Config config = client.getConfig(); + config.useClusterServers() + .setTimeout(123000); + RedissonRxClient redisson = Redisson.create(config).rxJava(); - - ClusterRunner clusterRunner = new ClusterRunner() - .addNode(master1, slave1) - .addNode(master2, slave2) - .addNode(master3, slave3); - ClusterProcesses process = clusterRunner.run(); - - Config config = new Config(); - config.useClusterServers() - .setTimeout(123000) - .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); - RedissonRxClient redisson = Redisson.create(config).rxJava(); - - batchOptions - .executionMode(ExecutionMode.IN_MEMORY_ATOMIC) - .syncSlaves(1, 1, TimeUnit.SECONDS); + batchOptions.executionMode(ExecutionMode.IN_MEMORY_ATOMIC) + .syncSlaves(1, 1, TimeUnit.SECONDS); - RBatchRx batch = redisson.createBatch(batchOptions); - for (int i = 0; i < 10; i++) { - batch.getAtomicLong("{test}" + i).addAndGet(i); - } + RBatchRx batch = redisson.createBatch(batchOptions); + for (int i = 0; i < 10; i++) { + batch.getAtomicLong("{test}" + i).addAndGet(i); + } - BatchResult result = sync(batch.execute()); - assertThat(result.getSyncedSlaves()).isEqualTo(1); - int i = 0; - for (Object res : result.getResponses()) { - assertThat((Long)res).isEqualTo(i++); - } - - process.shutdown(); - redisson.shutdown(); + BatchResult result = sync(batch.execute()); + assertThat(result.getSyncedSlaves()).isEqualTo(1); + int i = 0; + for (Object res : result.getResponses()) { + assertThat((Long)res).isEqualTo(i++); + } + + redisson.shutdown(); + }); } @@ -373,7 +336,7 @@ public class RedissonBatchRxTest extends BaseRxTest { @ParameterizedTest @MethodSource("data") public void testBatchBigRequest(BatchOptions batchOptions) { - Config config = BaseTest.createConfig(); + Config config = createConfig(); config.useSingleServer().setTimeout(15000); RedissonRxClient redisson = Redisson.create(config).rxJava(); diff --git a/redisson/src/test/java/org/redisson/rx/RedissonBlockingQueueRxTest.java b/redisson/src/test/java/org/redisson/rx/RedissonBlockingQueueRxTest.java index f64dea418..a8b2448f1 100644 --- a/redisson/src/test/java/org/redisson/rx/RedissonBlockingQueueRxTest.java +++ b/redisson/src/test/java/org/redisson/rx/RedissonBlockingQueueRxTest.java @@ -1,7 +1,10 @@ package org.redisson.rx; -import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertTrue; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.redisson.api.RBlockingQueueRx; import java.util.ArrayList; import java.util.HashSet; @@ -12,16 +15,13 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -import org.redisson.api.RBlockingQueueRx; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; public class RedissonBlockingQueueRxTest extends BaseRxTest { @Test - public void testTakeElements() { + public void testTakeElements() throws InterruptedException { RBlockingQueueRx queue = redisson.getBlockingQueue("test"); List elements = new ArrayList<>(); queue.takeElements().subscribe(new Subscriber() { @@ -48,7 +48,9 @@ public class RedissonBlockingQueueRxTest extends BaseRxTest { for (int i = 0; i < 10; i++) { sync(queue.add(i)); } - + + Thread.sleep(500); + assertThat(elements).containsExactly(0, 1, 2, 3); } diff --git a/redisson/src/test/java/org/redisson/rx/RedissonMapCacheRxTest.java b/redisson/src/test/java/org/redisson/rx/RedissonMapCacheRxTest.java index 6b999846d..f27b43f54 100644 --- a/redisson/src/test/java/org/redisson/rx/RedissonMapCacheRxTest.java +++ b/redisson/src/test/java/org/redisson/rx/RedissonMapCacheRxTest.java @@ -1,29 +1,22 @@ package org.redisson.rx; -import static org.assertj.core.api.Assertions.assertThat; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - +import org.awaitility.Awaitility; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.redisson.api.ExpiredObjectListener; import org.redisson.api.RMapCacheRx; -import org.redisson.api.RMapReactive; import org.redisson.api.RMapRx; import org.redisson.api.map.event.EntryEvent; import org.redisson.api.map.event.EntryExpiredListener; import org.redisson.codec.MsgPackJacksonCodec; +import java.io.Serializable; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.assertj.core.api.Assertions.assertThat; + public class RedissonMapCacheRxTest extends BaseRxTest { public static class SimpleKey implements Serializable { @@ -194,10 +187,10 @@ public class RedissonMapCacheRxTest extends BaseRxTest { } }).blockingGet(); - Thread.sleep(5100); - - assertThat(received).isTrue(); - assertThat(cache.size().blockingGet()).isZero(); + Awaitility.await().atMost(Duration.ofSeconds(6)).untilAsserted(() -> { + assertThat(received).isTrue(); + assertThat(cache.size().blockingGet()).isZero(); + }); } @Test