Fixed RedissonSemaphorTest

Added testMultiInstanceConcurrencySequentiallyLaunched method to launch
concurrent tests via for loop instead of stream.

Modified testConcurrency_MultiInstance_10_permits to use
testMultiInstanceConcurrencySequentiallyLaunched for concurrent test.
pull/509/head
jackygurui 9 years ago
parent bc01cf143a
commit 080ad65bfe

@ -2,6 +2,8 @@ package org.redisson;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
@ -28,9 +30,7 @@ public abstract class BaseConcurrentTest extends BaseTest {
pool.submit(() -> {
IntStream.range(0, iterations)
.parallel()
.forEach((i) -> {
runnable.run(instances.get(i));
});
.forEach((i) -> runnable.run(instances.get(i)));
});
pool.shutdown();
@ -50,6 +50,36 @@ public abstract class BaseConcurrentTest extends BaseTest {
Assert.assertTrue(pool.awaitTermination(5, TimeUnit.MINUTES));
}
protected void testMultiInstanceConcurrencySequentiallyLaunched(int iterations, final RedissonRunnable runnable) throws InterruptedException {
System.out.println("Multi Instance Concurrent Job Interation: " + iterations);
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
final Map<Integer, RedissonClient> instances = new HashMap<Integer, RedissonClient>();
for (int i = 0; i < iterations; i++) {
instances.put(i, BaseTest.createInstance());
}
long watch = System.currentTimeMillis();
for (int i = 0; i < iterations; i++) {
final int n = i;
executor.execute(() -> runnable.run(instances.get(n)));
}
executor.shutdown();
Assert.assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES));
System.out.println("multi: " + (System.currentTimeMillis() - watch));
executor = Executors.newCachedThreadPool();
for (final RedissonClient redisson : instances.values()) {
executor.execute(() -> redisson.shutdown());
}
executor.shutdown();
Assert.assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES));
}
protected void testSingleInstanceConcurrency(int iterations, final RedissonRunnable runnable) throws InterruptedException {
System.out.println("Single Instance Concurrent Job Interation: " + iterations);
final RedissonClient r = BaseTest.createInstance();

@ -234,7 +234,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest {
final AtomicInteger checkPermits = new AtomicInteger(s.availablePermits());
final CyclicBarrier barrier = new CyclicBarrier(s.availablePermits());
testMultiInstanceConcurrency(iterations, r -> {
testMultiInstanceConcurrencySequentiallyLaunched(iterations, r -> {
RSemaphore s1 = r.getSemaphore("test");
try {
s1.acquire();

Loading…
Cancel
Save