diff --git a/src/test/java/org/redisson/BaseConcurrentTest.java b/src/test/java/org/redisson/BaseConcurrentTest.java index 2d23d62c6..7f7549c6d 100644 --- a/src/test/java/org/redisson/BaseConcurrentTest.java +++ b/src/test/java/org/redisson/BaseConcurrentTest.java @@ -2,6 +2,8 @@ package org.redisson; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; @@ -28,9 +30,7 @@ public abstract class BaseConcurrentTest extends BaseTest { pool.submit(() -> { IntStream.range(0, iterations) .parallel() - .forEach((i) -> { - runnable.run(instances.get(i)); - }); + .forEach((i) -> runnable.run(instances.get(i))); }); pool.shutdown(); @@ -50,6 +50,36 @@ public abstract class BaseConcurrentTest extends BaseTest { Assert.assertTrue(pool.awaitTermination(5, TimeUnit.MINUTES)); } + protected void testMultiInstanceConcurrencySequentiallyLaunched(int iterations, final RedissonRunnable runnable) throws InterruptedException { + System.out.println("Multi Instance Concurrent Job Interation: " + iterations); + ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); + + final Map instances = new HashMap(); + for (int i = 0; i < iterations; i++) { + instances.put(i, BaseTest.createInstance()); + } + + long watch = System.currentTimeMillis(); + for (int i = 0; i < iterations; i++) { + final int n = i; + executor.execute(() -> runnable.run(instances.get(n))); + } + + executor.shutdown(); + Assert.assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES)); + + System.out.println("multi: " + (System.currentTimeMillis() - watch)); + + executor = Executors.newCachedThreadPool(); + + for (final RedissonClient redisson : instances.values()) { + executor.execute(() -> redisson.shutdown()); + } + + executor.shutdown(); + Assert.assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES)); + } + protected void testSingleInstanceConcurrency(int iterations, final RedissonRunnable runnable) throws InterruptedException { System.out.println("Single Instance Concurrent Job Interation: " + iterations); final RedissonClient r = BaseTest.createInstance(); diff --git a/src/test/java/org/redisson/RedissonSemaphoreTest.java b/src/test/java/org/redisson/RedissonSemaphoreTest.java index bca899307..aad574693 100644 --- a/src/test/java/org/redisson/RedissonSemaphoreTest.java +++ b/src/test/java/org/redisson/RedissonSemaphoreTest.java @@ -234,7 +234,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest { final AtomicInteger checkPermits = new AtomicInteger(s.availablePermits()); final CyclicBarrier barrier = new CyclicBarrier(s.availablePermits()); - testMultiInstanceConcurrency(iterations, r -> { + testMultiInstanceConcurrencySequentiallyLaunched(iterations, r -> { RSemaphore s1 = r.getSemaphore("test"); try { s1.acquire();