diff --git a/src/test/java/org/redisson/BaseConcurrentTest.java b/src/test/java/org/redisson/BaseConcurrentTest.java index 08979add2..b70654b07 100644 --- a/src/test/java/org/redisson/BaseConcurrentTest.java +++ b/src/test/java/org/redisson/BaseConcurrentTest.java @@ -2,76 +2,68 @@ package org.redisson; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; import org.junit.Assert; +import org.redisson.client.RedisClient; public abstract class BaseConcurrentTest extends BaseTest { protected void testMultiInstanceConcurrency(int iterations, final RedissonRunnable runnable) throws InterruptedException { - ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*2); + ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2); + final Map instances = new HashMap<>(); - final Map instances = new HashMap(); - for (int i = 0; i < iterations; i++) { - instances.put(i, BaseTest.createInstance()); - } + pool.submit(() -> { + IntStream.range(0, iterations) + .parallel() + .forEach((i) -> instances.put(i, BaseTest.createInstance())); + }); long watch = System.currentTimeMillis(); - for (int i = 0; i < iterations; i++) { - final int n = i; - executor.execute(new Runnable() { - @Override - public void run() { - RedissonClient redisson = instances.get(n); - runnable.run(redisson); - } - }); - } - - executor.shutdown(); - Assert.assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES)); + pool.awaitQuiescence(5, TimeUnit.MINUTES); + + pool.submit(() -> { + IntStream.range(0, iterations) + .parallel() + .forEach((i) -> runnable.run(instances.get(i))); + }); + + pool.shutdown(); + Assert.assertTrue(pool.awaitTermination(5, TimeUnit.MINUTES)); System.out.println("multi: " + (System.currentTimeMillis() - watch)); - executor = Executors.newCachedThreadPool(); + pool = new ForkJoinPool(); - for (final RedissonClient redisson : instances.values()) { - executor.execute(new Runnable() { - @Override - public void run() { - redisson.shutdown(); - } - }); - } + pool.submit(() -> { + instances.values() + .parallelStream() + .forEach((r) -> r.shutdown()); + }); - executor.shutdown(); - Assert.assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES)); + pool.shutdown(); + Assert.assertTrue(pool.awaitTermination(5, TimeUnit.MINUTES)); } protected void testSingleInstanceConcurrency(int iterations, final RedissonRunnable runnable) throws InterruptedException { - ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*2); - - final RedissonClient redisson = BaseTest.createInstance(); + final RedissonClient r = BaseTest.createInstance(); long watch = System.currentTimeMillis(); - for (int i = 0; i < iterations; i++) { - executor.execute(new Runnable() { - @Override - public void run() { - runnable.run(redisson); - } - }); - } - - executor.shutdown(); - Assert.assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES)); - System.out.println(System.currentTimeMillis() - watch); + ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2); + pool.submit(() -> { + IntStream.range(0, iterations) + .parallel() + .forEach((i) -> runnable.run(r)); + }); - redisson.shutdown(); - } + pool.shutdown(); + Assert.assertTrue(pool.awaitTermination(5, TimeUnit.MINUTES)); + System.out.println(System.currentTimeMillis() - watch); + r.shutdown(); + } } diff --git a/src/test/java/org/redisson/BaseTest.java b/src/test/java/org/redisson/BaseTest.java index 6cbfa57cf..46f729cda 100644 --- a/src/test/java/org/redisson/BaseTest.java +++ b/src/test/java/org/redisson/BaseTest.java @@ -45,7 +45,8 @@ public abstract class BaseTest { } @Before - public void before() { + public void before() throws InterruptedException { + Thread.sleep(5000l); redisson.getKeys().flushall(); } }