From 9eeb4c05e42ec497ff44d1fb5e28d3c66c7c6663 Mon Sep 17 00:00:00 2001 From: Rui Gu Date: Thu, 28 Apr 2016 14:11:12 +0100 Subject: [PATCH] Revert "ConcurrentTest optimisation" This reverts commit d9610a2f29e184c99b6a0dca9385f8182b183bd6. --- .../java/org/redisson/BaseConcurrentTest.java | 65 +++++++++++++++---- .../org/redisson/RedissonSemaphoreTest.java | 2 +- 2 files changed, 52 insertions(+), 15 deletions(-) diff --git a/src/test/java/org/redisson/BaseConcurrentTest.java b/src/test/java/org/redisson/BaseConcurrentTest.java index d31120e5a..7f7549c6d 100644 --- a/src/test/java/org/redisson/BaseConcurrentTest.java +++ b/src/test/java/org/redisson/BaseConcurrentTest.java @@ -14,6 +14,43 @@ import org.redisson.client.RedisClient; public abstract class BaseConcurrentTest extends BaseTest { protected void testMultiInstanceConcurrency(int iterations, final RedissonRunnable runnable) throws InterruptedException { + System.out.println("Multi Instance Concurrent Job Interation: " + iterations); + ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2); + final Map instances = new HashMap<>(); + + pool.submit(() -> { + IntStream.range(0, iterations) + .parallel() + .forEach((i) -> instances.put(i, BaseTest.createInstance())); + }); + + long watch = System.currentTimeMillis(); + pool.awaitQuiescence(5, TimeUnit.MINUTES); + + pool.submit(() -> { + IntStream.range(0, iterations) + .parallel() + .forEach((i) -> runnable.run(instances.get(i))); + }); + + pool.shutdown(); + Assert.assertTrue(pool.awaitTermination(RedissonRuntimeEnvironment.isTravis ? 10 : 3, TimeUnit.MINUTES)); + + System.out.println("multi: " + (System.currentTimeMillis() - watch)); + + pool = new ForkJoinPool(); + + pool.submit(() -> { + instances.values() + .parallelStream() + .forEach((r) -> r.shutdown()); + }); + + pool.shutdown(); + Assert.assertTrue(pool.awaitTermination(5, TimeUnit.MINUTES)); + } + + protected void testMultiInstanceConcurrencySequentiallyLaunched(int iterations, final RedissonRunnable runnable) throws InterruptedException { System.out.println("Multi Instance Concurrent Job Interation: " + iterations); ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); @@ -45,25 +82,25 @@ public abstract class BaseConcurrentTest extends BaseTest { protected void testSingleInstanceConcurrency(int iterations, final RedissonRunnable runnable) throws InterruptedException { System.out.println("Single Instance Concurrent Job Interation: " + iterations); - ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); - - final RedissonClient redisson = BaseTest.createInstance(); + final RedissonClient r = BaseTest.createInstance(); long watch = System.currentTimeMillis(); - for (int i = 0; i < iterations; i++) { - executor.execute(new Runnable() { - @Override - public void run() { - runnable.run(redisson); - } - }); - } - executor.shutdown(); - Assert.assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES)); + ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2); + + pool.submit(() -> { + IntStream.range(0, iterations) + .parallel() + .forEach((i) -> { + runnable.run(r); + }); + }); + + pool.shutdown(); + Assert.assertTrue(pool.awaitTermination(RedissonRuntimeEnvironment.isTravis ? 10 : 3, TimeUnit.MINUTES)); System.out.println(System.currentTimeMillis() - watch); - redisson.shutdown(); + r.shutdown(); } } diff --git a/src/test/java/org/redisson/RedissonSemaphoreTest.java b/src/test/java/org/redisson/RedissonSemaphoreTest.java index bca899307..aad574693 100644 --- a/src/test/java/org/redisson/RedissonSemaphoreTest.java +++ b/src/test/java/org/redisson/RedissonSemaphoreTest.java @@ -234,7 +234,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest { final AtomicInteger checkPermits = new AtomicInteger(s.availablePermits()); final CyclicBarrier barrier = new CyclicBarrier(s.availablePermits()); - testMultiInstanceConcurrency(iterations, r -> { + testMultiInstanceConcurrencySequentiallyLaunched(iterations, r -> { RSemaphore s1 = r.getSemaphore("test"); try { s1.acquire();