diff --git a/src/test/java/org/redisson/BaseConcurrentTest.java b/src/test/java/org/redisson/BaseConcurrentTest.java index 1f142fec5..c863fb523 100644 --- a/src/test/java/org/redisson/BaseConcurrentTest.java +++ b/src/test/java/org/redisson/BaseConcurrentTest.java @@ -4,50 +4,50 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; -import java.util.stream.IntStream; import org.junit.Assert; -import org.redisson.client.RedisClient; public abstract class BaseConcurrentTest extends BaseTest { protected void testMultiInstanceConcurrency(int iterations, final RedissonRunnable runnable) throws InterruptedException { - System.out.println("Multi Instance Concurrent Job Interation: " + iterations); - ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2); - final Map instances = new HashMap<>(); + ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*2); - pool.submit(() -> { - IntStream.range(0, iterations) - .parallel() - .forEach((i) -> instances.put(i, BaseTest.createInstance())); - }); + final Map instances = new HashMap(); + for (int i = 0; i < iterations; i++) { + instances.put(i, BaseTest.createInstance()); + } long watch = System.currentTimeMillis(); - pool.awaitQuiescence(5, TimeUnit.MINUTES); - - pool.submit(() -> { - IntStream.range(0, iterations) - .parallel() - .forEach((i) -> runnable.run(instances.get(i))); - }); + for (int i = 0; i < iterations; i++) { + final int n = i; + executor.execute(new Runnable() { + @Override + public void run() { + RedissonClient redisson = instances.get(n); + runnable.run(redisson); + } + }); + } - pool.shutdown(); - Assert.assertTrue(pool.awaitTermination(RedissonRuntimeEnvironment.isTravis ? 10 : 3, TimeUnit.MINUTES)); + executor.shutdown(); + Assert.assertTrue(executor.awaitTermination(RedissonRuntimeEnvironment.isTravis ? 10 : 3, TimeUnit.MINUTES)); System.out.println("multi: " + (System.currentTimeMillis() - watch)); - pool = new ForkJoinPool(); + executor = Executors.newCachedThreadPool(); - pool.submit(() -> { - instances.values() - .parallelStream() - .forEach((r) -> r.shutdown()); - }); + for (final RedissonClient redisson : instances.values()) { + executor.execute(new Runnable() { + @Override + public void run() { + redisson.shutdown(); + } + }); + } - pool.shutdown(); - Assert.assertTrue(pool.awaitTermination(5, TimeUnit.MINUTES)); + executor.shutdown(); + Assert.assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES)); } protected void testMultiInstanceConcurrencySequentiallyLaunched(int iterations, final RedissonRunnable runnable) throws InterruptedException { @@ -85,7 +85,6 @@ public abstract class BaseConcurrentTest extends BaseTest { final RedissonClient r = BaseTest.createInstance(); long watch = System.currentTimeMillis(); -// ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2); ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); for (int i = 0; i < iterations; i++) { @@ -93,13 +92,6 @@ public abstract class BaseConcurrentTest extends BaseTest { runnable.run(r); }); } -// pool.submit(() -> { -// IntStream.range(0, iterations) -// .parallel() -// .forEach((i) -> { -// runnable.run(r); -// }); -// }); pool.shutdown(); Assert.assertTrue(pool.awaitTermination(RedissonRuntimeEnvironment.isTravis ? 10 : 3, TimeUnit.MINUTES));