diff --git a/src/test/java/org/redisson/RedissonRemoteServiceTest.java b/src/test/java/org/redisson/RedissonRemoteServiceTest.java index 5739bc7bc..de3a09a62 100644 --- a/src/test/java/org/redisson/RedissonRemoteServiceTest.java +++ b/src/test/java/org/redisson/RedissonRemoteServiceTest.java @@ -3,7 +3,9 @@ package org.redisson; import static org.assertj.core.api.Assertions.assertThat; import java.io.IOException; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Assert; import org.junit.Test; @@ -59,6 +61,88 @@ public class RedissonRemoteServiceTest extends BaseTest { } + @Test + public void testExecutorsAmountConcurrency() throws InterruptedException { + + // Redisson server and client + final RedissonClient server = Redisson.create(); + final RedissonClient client = Redisson.create(); + + final int serverAmount = 1; + final int clientAmount = 10; + + // to store the current call concurrency count + final AtomicInteger concurrency = new AtomicInteger(0); + + // a flag to indicate the the allowed concurrency was exceeded + final AtomicBoolean concurrencyIsExceeded = new AtomicBoolean(false); + + // the server: register a service with an overrided timeoutMethod method that: + // - incr the concurrency + // - check if concurrency is greater than what was allowed, and if yes set the concurrencyOfOneIsExceeded flag + // - wait 2s + // - decr the concurrency + server.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl() { + @Override + public void timeoutMethod() throws InterruptedException { + try { + if (concurrency.incrementAndGet() > serverAmount) { + concurrencyIsExceeded.compareAndSet(false, true); + } + super.timeoutMethod(); + } finally { + concurrency.decrementAndGet(); + } + } + }, serverAmount); + + // a latch to force the client threads to execute simultaneously + // (as far as practicable, this is hard to predict) + final CountDownLatch readyLatch = new CountDownLatch(1); + + // the client: starts a couple of threads that will: + // - await for the ready latch + // - then call timeoutMethod + Future[] clientFutures = new Future[clientAmount]; + ExecutorService executor = Executors.newFixedThreadPool(clientAmount); + for (int i = 0; i < clientAmount; i++) { + clientFutures[i] = executor.submit(new Runnable() { + @Override + public void run() { + try { + RemoteInterface ri = client.getRemoteSerivce().get(RemoteInterface.class, clientAmount * 3, TimeUnit.SECONDS, clientAmount * 3, TimeUnit.SECONDS); + readyLatch.await(); + ri.timeoutMethod(); + } catch (InterruptedException e) { + // ignore + } + } + }); + } + + // open the latch to wake the threads + readyLatch.countDown(); + + // await for the client threads to terminate + for (Future clientFuture : clientFutures) { + try { + clientFuture.get(); + } catch (ExecutionException e) { + // ignore + } + } + executor.shutdown(); + executor.awaitTermination(clientAmount * 3, TimeUnit.SECONDS); + + // shutdown the server and the client + server.shutdown(); + client.shutdown(); + + // do the concurrencyIsExceeded flag was set ? + // if yes, that would indicate that the server exceeded its expected concurrency + assertThat(concurrencyIsExceeded.get()).isEqualTo(false); + } + @Test(expected = RemoteServiceTimeoutException.class) public void testTimeout() throws InterruptedException { RedissonClient r1 = Redisson.create();