|
|
|
@ -3,7 +3,9 @@ package org.redisson;
|
|
|
|
|
import static org.assertj.core.api.Assertions.assertThat;
|
|
|
|
|
|
|
|
|
|
import java.io.IOException;
|
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
import java.util.concurrent.*;
|
|
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
|
|
|
|
import org.junit.Assert;
|
|
|
|
|
import org.junit.Test;
|
|
|
|
@ -59,6 +61,88 @@ public class RedissonRemoteServiceTest extends BaseTest {
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void testExecutorsAmountConcurrency() throws InterruptedException {
|
|
|
|
|
|
|
|
|
|
// Redisson server and client
|
|
|
|
|
final RedissonClient server = Redisson.create();
|
|
|
|
|
final RedissonClient client = Redisson.create();
|
|
|
|
|
|
|
|
|
|
final int serverAmount = 1;
|
|
|
|
|
final int clientAmount = 10;
|
|
|
|
|
|
|
|
|
|
// to store the current call concurrency count
|
|
|
|
|
final AtomicInteger concurrency = new AtomicInteger(0);
|
|
|
|
|
|
|
|
|
|
// a flag to indicate the the allowed concurrency was exceeded
|
|
|
|
|
final AtomicBoolean concurrencyIsExceeded = new AtomicBoolean(false);
|
|
|
|
|
|
|
|
|
|
// the server: register a service with an overrided timeoutMethod method that:
|
|
|
|
|
// - incr the concurrency
|
|
|
|
|
// - check if concurrency is greater than what was allowed, and if yes set the concurrencyOfOneIsExceeded flag
|
|
|
|
|
// - wait 2s
|
|
|
|
|
// - decr the concurrency
|
|
|
|
|
server.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl() {
|
|
|
|
|
@Override
|
|
|
|
|
public void timeoutMethod() throws InterruptedException {
|
|
|
|
|
try {
|
|
|
|
|
if (concurrency.incrementAndGet() > serverAmount) {
|
|
|
|
|
concurrencyIsExceeded.compareAndSet(false, true);
|
|
|
|
|
}
|
|
|
|
|
super.timeoutMethod();
|
|
|
|
|
} finally {
|
|
|
|
|
concurrency.decrementAndGet();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}, serverAmount);
|
|
|
|
|
|
|
|
|
|
// a latch to force the client threads to execute simultaneously
|
|
|
|
|
// (as far as practicable, this is hard to predict)
|
|
|
|
|
final CountDownLatch readyLatch = new CountDownLatch(1);
|
|
|
|
|
|
|
|
|
|
// the client: starts a couple of threads that will:
|
|
|
|
|
// - await for the ready latch
|
|
|
|
|
// - then call timeoutMethod
|
|
|
|
|
Future[] clientFutures = new Future[clientAmount];
|
|
|
|
|
ExecutorService executor = Executors.newFixedThreadPool(clientAmount);
|
|
|
|
|
for (int i = 0; i < clientAmount; i++) {
|
|
|
|
|
clientFutures[i] = executor.submit(new Runnable() {
|
|
|
|
|
@Override
|
|
|
|
|
public void run() {
|
|
|
|
|
try {
|
|
|
|
|
RemoteInterface ri = client.getRemoteSerivce().get(RemoteInterface.class, clientAmount * 3, TimeUnit.SECONDS, clientAmount * 3, TimeUnit.SECONDS);
|
|
|
|
|
readyLatch.await();
|
|
|
|
|
ri.timeoutMethod();
|
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
|
// ignore
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// open the latch to wake the threads
|
|
|
|
|
readyLatch.countDown();
|
|
|
|
|
|
|
|
|
|
// await for the client threads to terminate
|
|
|
|
|
for (Future clientFuture : clientFutures) {
|
|
|
|
|
try {
|
|
|
|
|
clientFuture.get();
|
|
|
|
|
} catch (ExecutionException e) {
|
|
|
|
|
// ignore
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
executor.shutdown();
|
|
|
|
|
executor.awaitTermination(clientAmount * 3, TimeUnit.SECONDS);
|
|
|
|
|
|
|
|
|
|
// shutdown the server and the client
|
|
|
|
|
server.shutdown();
|
|
|
|
|
client.shutdown();
|
|
|
|
|
|
|
|
|
|
// do the concurrencyIsExceeded flag was set ?
|
|
|
|
|
// if yes, that would indicate that the server exceeded its expected concurrency
|
|
|
|
|
assertThat(concurrencyIsExceeded.get()).isEqualTo(false);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test(expected = RemoteServiceTimeoutException.class)
|
|
|
|
|
public void testTimeout() throws InterruptedException {
|
|
|
|
|
RedissonClient r1 = Redisson.create();
|
|
|
|
|