diff --git a/redisson/src/test/java/org/redisson/BaseConcurrentTest.java b/redisson/src/test/java/org/redisson/BaseConcurrentTest.java index 389a9125a..b432d60c5 100644 --- a/redisson/src/test/java/org/redisson/BaseConcurrentTest.java +++ b/redisson/src/test/java/org/redisson/BaseConcurrentTest.java @@ -12,7 +12,7 @@ import org.redisson.config.Config; import io.netty.channel.nio.NioEventLoopGroup; -public abstract class BaseConcurrentTest extends BaseTest { +public abstract class BaseConcurrentTest extends RedisDockerTest { protected void testMultiInstanceConcurrency(int iterations, final RedissonRunnable runnable) throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*2); @@ -59,39 +59,9 @@ public abstract class BaseConcurrentTest extends BaseTest { Assertions.assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES)); } - protected void testMultiInstanceConcurrencySequentiallyLaunched(int iterations, final RedissonRunnable runnable) throws InterruptedException { - System.out.println("Multi Instance Concurrent Job Interation: " + iterations); - ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); - - final Map instances = new HashMap(); - for (int i = 0; i < iterations; i++) { - instances.put(i, BaseTest.createInstance()); - } - - long watch = System.currentTimeMillis(); - for (int i = 0; i < iterations; i++) { - final int n = i; - executor.execute(() -> runnable.run(instances.get(n))); - } - - executor.shutdown(); - Assertions.assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES)); - - System.out.println("multi: " + (System.currentTimeMillis() - watch)); - - executor = Executors.newCachedThreadPool(); - - for (final RedissonClient redisson : instances.values()) { - executor.execute(() -> redisson.shutdown()); - } - - executor.shutdown(); - Assertions.assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES)); - } - protected void testSingleInstanceConcurrency(int iterations, final RedissonRunnable runnable) throws InterruptedException { System.out.println("Single Instance Concurrent Job Interation: " + iterations); - final RedissonClient r = BaseTest.createInstance(); + final RedissonClient r = createInstance(); long watch = System.currentTimeMillis(); ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); diff --git a/redisson/src/test/java/org/redisson/ConcurrentRedissonSortedSetTest.java b/redisson/src/test/java/org/redisson/ConcurrentRedissonSortedSetTest.java index d6d9df0ef..b732bc9e9 100644 --- a/redisson/src/test/java/org/redisson/ConcurrentRedissonSortedSetTest.java +++ b/redisson/src/test/java/org/redisson/ConcurrentRedissonSortedSetTest.java @@ -20,12 +20,11 @@ public class ConcurrentRedissonSortedSetTest extends BaseConcurrentTest { public void testAdd_SingleInstance() throws InterruptedException { final String name = "testAdd_SingleInstance"; - RedissonClient r = BaseTest.createInstance(); - RSortedSet map = r.getSortedSet(name); + RSortedSet map = redisson.getSortedSet(name); map.clear(); int length = 5000; - final List elements = new ArrayList(); + final List elements = new ArrayList<>(); for (int i = 1; i < length + 1; i++) { elements.add(i); } @@ -43,15 +42,13 @@ public class ConcurrentRedissonSortedSetTest extends BaseConcurrentTest { assertThat(map).containsExactly(p); map.clear(); - r.shutdown(); } @Test public void testAddRemove_SingleInstance() throws InterruptedException, NoSuchAlgorithmException { final String name = "testAddNegative_SingleInstance"; - RedissonClient r = BaseTest.createInstance(); - RSortedSet map = r.getSortedSet(name); + RSortedSet map = redisson.getSortedSet(name); map.clear(); int length = 1000; for (int i = 0; i < length; i++) { @@ -79,8 +76,6 @@ public class ConcurrentRedissonSortedSetTest extends BaseConcurrentTest { Assertions.fail(); } } - - r.shutdown(); } } diff --git a/redisson/src/test/java/org/redisson/RedissonLockTest.java b/redisson/src/test/java/org/redisson/RedissonLockTest.java index ff2a1af9c..5509ce2bb 100644 --- a/redisson/src/test/java/org/redisson/RedissonLockTest.java +++ b/redisson/src/test/java/org/redisson/RedissonLockTest.java @@ -7,9 +7,8 @@ import org.redisson.api.RedissonClient; import org.redisson.client.*; import org.redisson.client.protocol.RedisCommands; import org.redisson.config.Config; -import org.redisson.connection.balancer.RandomLoadBalancer; +import org.testcontainers.containers.GenericContainer; -import java.io.IOException; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; @@ -81,19 +80,13 @@ public class RedissonLockTest extends BaseConcurrentTest { } @Test - public void testSubscriptionsPerConnection() throws InterruptedException, IOException { - RedisRunner.RedisProcess runner = new RedisRunner() - .port(RedisRunner.findFreePort()) - .nosave() - .randomDir() - .run(); - + public void testSubscriptionsPerConnection() throws InterruptedException { Config config = new Config(); config.useSingleServer() .setSubscriptionConnectionPoolSize(1) .setSubscriptionConnectionMinimumIdleSize(1) .setSubscriptionsPerConnection(1) - .setAddress(runner.getRedisServerAddressAndPort()); + .setAddress(redisson.getConfig().useSingleServer().getAddress()); RedissonClient redisson = Redisson.create(config); ExecutorService e = Executors.newFixedThreadPool(32); @@ -124,7 +117,7 @@ public class RedissonLockTest extends BaseConcurrentTest { assertThat(errors.get()).isZero(); RedisClientConfig cc = new RedisClientConfig(); - cc.setAddress(runner.getRedisServerAddressAndPort()); + cc.setAddress(redisson.getConfig().useSingleServer().getAddress()); RedisClient c = RedisClient.create(cc); RedisConnection ccc = c.connect(); List channels = ccc.sync(RedisCommands.PUBSUB_CHANNELS); @@ -132,20 +125,13 @@ public class RedissonLockTest extends BaseConcurrentTest { c.shutdown(); redisson.shutdown(); - runner.stop(); } @Test - public void testSinglePubSub() throws IOException, InterruptedException, ExecutionException { - RedisRunner.RedisProcess runner = new RedisRunner() - .port(RedisRunner.findFreePort()) - .nosave() - .randomDir() - .run(); - + public void testSinglePubSub() throws InterruptedException, ExecutionException { Config config = new Config(); config.useSingleServer() - .setAddress(runner.getRedisServerAddressAndPort()) + .setAddress(redisson.getConfig().useSingleServer().getAddress()) .setSubscriptionConnectionPoolSize(1) .setSubscriptionsPerConnection(1); ExecutorService executorService = Executors.newFixedThreadPool(4); @@ -165,7 +151,6 @@ public class RedissonLockTest extends BaseConcurrentTest { assertThat(hasFails).isFalse(); redissonClient.shutdown(); - runner.stop(); } @Test @@ -193,23 +178,25 @@ public class RedissonLockTest extends BaseConcurrentTest { @Test public void testRedisFailed() { - Assertions.assertThrows(WriteRedisConnectionException.class, () -> { - RedisRunner.RedisProcess master = new RedisRunner() - .port(6377) - .nosave() - .randomDir() - .run(); + GenericContainer redis = + new GenericContainer<>("redis:7.2") + .withExposedPorts(6379); + redis.start(); - Config config = new Config(); - config.useSingleServer().setAddress("redis://127.0.0.1:6377"); - RedissonClient redisson = Redisson.create(config); + Config config = new Config(); + config.useSingleServer().setAddress("redis://127.0.0.1:" + redis.getFirstMappedPort()); + RedissonClient redisson = Redisson.create(config); + + Assertions.assertThrows(WriteRedisConnectionException.class, () -> { RLock lock = redisson.getLock("myLock"); // kill RedisServer while main thread is sleeping. - master.stop(); + redis.stop(); Thread.sleep(3000); lock.tryLock(5, 10, TimeUnit.SECONDS); }); + + redisson.shutdown(); } @Test @@ -223,7 +210,7 @@ public class RedissonLockTest extends BaseConcurrentTest { long startTime = System.currentTimeMillis(); lock.tryLock(3, TimeUnit.SECONDS); - assertThat(System.currentTimeMillis() - startTime).isBetween(2990L, 3100L); + assertThat(System.currentTimeMillis() - startTime).isBetween(2990L, 3350L); } @Test @@ -289,35 +276,13 @@ public class RedissonLockTest extends BaseConcurrentTest { @Test public void testInCluster() throws Exception { - RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave(); - RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave(); - RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave(); - RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave(); - RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave(); - RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave(); - - ClusterRunner clusterRunner = new ClusterRunner() - .addNode(master1, slave1) - .addNode(master2, slave2) - .addNode(master3, slave3); - ClusterRunner.ClusterProcesses process = clusterRunner.run(); - - Thread.sleep(5000); - - Config config = new Config(); - config.useClusterServers() - .setLoadBalancer(new RandomLoadBalancer()) - .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); - RedissonClient redisson = Redisson.create(config); - - RLock lock = redisson.getLock("myLock"); - lock.lock(); - assertThat(lock.isLocked()).isTrue(); - lock.unlock(); - assertThat(lock.isLocked()).isFalse(); - - redisson.shutdown(); - process.shutdown(); + testInCluster(redisson -> { + RLock lock = redisson.getLock("myLock"); + lock.lock(); + assertThat(lock.isLocked()).isTrue(); + lock.unlock(); + assertThat(lock.isLocked()).isFalse(); + }); } diff --git a/redisson/src/test/java/org/redisson/RedissonPermitExpirableSemaphoreTest.java b/redisson/src/test/java/org/redisson/RedissonPermitExpirableSemaphoreTest.java index 41e3bb057..364aa4172 100644 --- a/redisson/src/test/java/org/redisson/RedissonPermitExpirableSemaphoreTest.java +++ b/redisson/src/test/java/org/redisson/RedissonPermitExpirableSemaphoreTest.java @@ -23,43 +23,35 @@ import static org.assertj.core.api.Assertions.assertThat; public class RedissonPermitExpirableSemaphoreTest extends BaseConcurrentTest { @Test - public void testGetInClusterNameMapper() throws RedisRunner.FailedToStartRedisException, IOException, InterruptedException { - RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave(); - - ClusterRunner clusterRunner = new ClusterRunner() - .addNode(master1, slave1) - .addNode(master2, slave2) - .addNode(master3, slave3); - ClusterRunner.ClusterProcesses process = clusterRunner.run(); - - Config config = new Config(); - config.useClusterServers() - .setNameMapper(new NameMapper() { - @Override - public String map(String name) { - return "test::" + name; - } - - @Override - public String unmap(String name) { - return name.replace("test::", ""); - } - }) - .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); - RedissonClient redisson = Redisson.create(config); + public void testGetInClusterNameMapper() throws RedisRunner.FailedToStartRedisException, InterruptedException { + testInCluster(client -> { + Config config = client.getConfig(); + config.useClusterServers() + .setNameMapper(new NameMapper() { + @Override + public String map(String name) { + return "test::" + name; + } + + @Override + public String unmap(String name) { + return name.replace("test::", ""); + } + }); + RedissonClient redisson = Redisson.create(config); + + RPermitExpirableSemaphore s = redisson.getPermitExpirableSemaphore("semaphore"); + s.trySetPermits(1); + try { + String v = s.acquire(); + s.release(v); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } - RPermitExpirableSemaphore s = redisson.getPermitExpirableSemaphore("semaphore"); - s.trySetPermits(1); - String v = s.acquire(); - s.release(v); + redisson.shutdown(); - redisson.shutdown(); - process.shutdown(); + }); } @Test diff --git a/redisson/src/test/java/org/redisson/RedissonRateLimiterTest.java b/redisson/src/test/java/org/redisson/RedissonRateLimiterTest.java index 54daf8e1c..3720c3e9f 100644 --- a/redisson/src/test/java/org/redisson/RedissonRateLimiterTest.java +++ b/redisson/src/test/java/org/redisson/RedissonRateLimiterTest.java @@ -50,7 +50,7 @@ public class RedissonRateLimiterTest extends RedisDockerTest { Thread.sleep(1000); } - assertThat(sizes.stream().filter(s -> s == rate).count()).isGreaterThan(16); + assertThat(sizes.stream().filter(s -> s == rate).count()).isGreaterThanOrEqualTo(16); e.shutdownNow(); } diff --git a/redisson/src/test/java/org/redisson/RedissonReadWriteLockTest.java b/redisson/src/test/java/org/redisson/RedissonReadWriteLockTest.java index 5ff6e46aa..ea895080c 100644 --- a/redisson/src/test/java/org/redisson/RedissonReadWriteLockTest.java +++ b/redisson/src/test/java/org/redisson/RedissonReadWriteLockTest.java @@ -161,30 +161,14 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest { } @Test - public void testInCluster() throws Exception { - RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); - - ClusterRunner clusterRunner = new ClusterRunner() - .addNode(master1) - .addNode(master2) - .addNode(master3); - ClusterProcesses process = clusterRunner.run(); - - Config config = new Config(); - config.useClusterServers() - .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); - RedissonClient redisson = Redisson.create(config); - - RReadWriteLock s = redisson.getReadWriteLock("1234"); - s.writeLock().lock(); - s.readLock().lock(); - s.readLock().unlock(); - s.writeLock().unlock(); - - redisson.shutdown(); - process.shutdown(); + public void testInCluster() { + testInCluster(redisson -> { + RReadWriteLock s = redisson.getReadWriteLock("1234"); + s.writeLock().lock(); + s.readLock().lock(); + s.readLock().unlock(); + s.writeLock().unlock(); + }); } @Test diff --git a/redisson/src/test/java/org/redisson/RedissonSpinLockTest.java b/redisson/src/test/java/org/redisson/RedissonSpinLockTest.java index cee8c96bd..866c88c2d 100644 --- a/redisson/src/test/java/org/redisson/RedissonSpinLockTest.java +++ b/redisson/src/test/java/org/redisson/RedissonSpinLockTest.java @@ -9,6 +9,7 @@ import org.redisson.api.RedissonClient; import org.redisson.client.WriteRedisConnectionException; import org.redisson.config.Config; import org.redisson.connection.balancer.RandomLoadBalancer; +import org.testcontainers.containers.GenericContainer; import java.io.IOException; import java.time.Duration; @@ -56,24 +57,26 @@ public class RedissonSpinLockTest extends BaseConcurrentTest { } @Test - public void testRedisFailed() throws IOException, InterruptedException { - RedisRunner.RedisProcess master = new RedisRunner() - .port(6377) - .nosave() - .randomDir() - .run(); + public void testRedisFailed() { + GenericContainer redis = + new GenericContainer<>("redis:7.2") + .withExposedPorts(6379); + redis.start(); Config config = new Config(); - config.useSingleServer().setAddress("redis://127.0.0.1:6377"); + config.useSingleServer().setAddress("redis://127.0.0.1:" + redis.getFirstMappedPort()); RedissonClient redisson = Redisson.create(config); Assertions.assertThrows(WriteRedisConnectionException.class, () -> { + RLock lock = redisson.getSpinLock("myLock"); // kill RedisServer while main thread is sleeping. - master.stop(); + redis.stop(); Thread.sleep(3000); lock.tryLock(5, 10, TimeUnit.SECONDS); }); + + redisson.shutdown(); } @Test @@ -144,38 +147,20 @@ public class RedissonSpinLockTest extends BaseConcurrentTest { @Test public void testInCluster() throws Exception { - RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave(); - RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave(); - RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave(); - RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave(); - RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave(); - RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave(); - - ClusterRunner clusterRunner = new ClusterRunner() - .addNode(master1, slave1) - .addNode(master2, slave2) - .addNode(master3, slave3); - ClusterRunner.ClusterProcesses process = clusterRunner.run(); - - Thread.sleep(5000); + testInCluster(client -> { + Config config = client.getConfig(); + config.setSlavesSyncTimeout(3000); - Config config = new Config(); - config.useClusterServers() - .setLoadBalancer(new RandomLoadBalancer()) - .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); - RedissonClient redisson = Redisson.create(config); - - RLock lock = redisson.getSpinLock("myLock"); - lock.lock(); - assertThat(lock.isLocked()).isTrue(); - lock.unlock(); - assertThat(lock.isLocked()).isFalse(); - - redisson.shutdown(); - process.shutdown(); + RedissonClient redisson = Redisson.create(config); + RLock lock = redisson.getSpinLock("myLock"); + lock.lock(); + assertThat(lock.isLocked()).isTrue(); + lock.unlock(); + assertThat(lock.isLocked()).isFalse(); + redisson.shutdown(); + }); } - @Test public void testAutoExpire() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1);