diff --git a/src/main/java/org/redisson/RedissonRemoteService.java b/src/main/java/org/redisson/RedissonRemoteService.java index 1ee3158dd..17cc2a543 100644 --- a/src/main/java/org/redisson/RedissonRemoteService.java +++ b/src/main/java/org/redisson/RedissonRemoteService.java @@ -23,6 +23,7 @@ import java.lang.reflect.Proxy; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -98,8 +99,12 @@ public class RedissonRemoteService implements RRemoteService { } @Override - public void register(Class remoteInterface, T object, int executorsAmount) { - if (executorsAmount < 1) { + public void register(Class remoteInterface, T object, int workersAmount) { + register(remoteInterface, object, workersAmount, null); + } + + public void register(Class remoteInterface, T object, int workersAmount, Executor executor) { + if (workersAmount < 1) { throw new IllegalArgumentException("executorsAmount can't be lower than 1"); } for (Method method : remoteInterface.getMethods()) { @@ -110,10 +115,10 @@ public class RedissonRemoteService implements RRemoteService { } } - for (int i = 0; i < executorsAmount; i++) { + for (int i = 0; i < workersAmount; i++) { String requestQueueName = getRequestQueueName(remoteInterface); RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName, getCodec()); - subscribe(remoteInterface, requestQueue); + subscribe(remoteInterface, requestQueue, executor); } } @@ -144,7 +149,7 @@ public class RedissonRemoteService implements RRemoteService { } } - private void subscribe(final Class remoteInterface, final RBlockingQueue requestQueue) { + private void subscribe(final Class remoteInterface, final RBlockingQueue requestQueue, final Executor executor) { Future take = requestQueue.takeAsync(); take.addListener(new FutureListener() { @Override @@ -154,7 +159,7 @@ public class RedissonRemoteService implements RRemoteService { return; } // re-subscribe after a failed takeAsync - subscribe(remoteInterface, requestQueue); + subscribe(remoteInterface, requestQueue, executor); return; } @@ -166,7 +171,7 @@ public class RedissonRemoteService implements RRemoteService { if (request.getOptions().isAckExpected() && System.currentTimeMillis() - request.getDate() > request.getOptions().getAckTimeoutInMillis()) { log.debug("request: {} has been skipped due to ackTimeout"); // re-subscribe after a skipped ackTimeout - subscribe(remoteInterface, requestQueue); + subscribe(remoteInterface, requestQueue, executor); return; } @@ -196,27 +201,46 @@ public class RedissonRemoteService implements RRemoteService { return; } // re-subscribe after a failed send (ack) - subscribe(remoteInterface, requestQueue); + subscribe(remoteInterface, requestQueue, executor); return; } if (!future.getNow()) { - subscribe(remoteInterface, requestQueue); + subscribe(remoteInterface, requestQueue, executor); return; } - - invokeMethod(remoteInterface, requestQueue, request, method, responseName); + + if (executor != null) { + executor.execute(new Runnable() { + @Override + public void run() { + invokeMethod(remoteInterface, requestQueue, request, method, responseName, executor); + } + }); + } else { + invokeMethod(remoteInterface, requestQueue, request, method, responseName, executor); + } } }); } else { - invokeMethod(remoteInterface, requestQueue, request, method, responseName); + if (executor != null) { + executor.execute(new Runnable() { + @Override + public void run() { + invokeMethod(remoteInterface, requestQueue, request, method, responseName, executor); + } + }); + } else { + invokeMethod(remoteInterface, requestQueue, request, method, responseName, executor); + } } } }); } - private void invokeMethod(final Class remoteInterface, final RBlockingQueue requestQueue, final RemoteServiceRequest request, RemoteServiceMethod method, String responseName) { + private void invokeMethod(final Class remoteInterface, final RBlockingQueue requestQueue, + final RemoteServiceRequest request, RemoteServiceMethod method, String responseName, final Executor executor) { final AtomicReference responseHolder = new AtomicReference(); try { Object result = method.getMethod().invoke(method.getBean(), request.getArgs()); @@ -241,12 +265,12 @@ public class RedissonRemoteService implements RRemoteService { } } // re-subscribe anyways (fail or success) after the send (response) - subscribe(remoteInterface, requestQueue); + subscribe(remoteInterface, requestQueue, executor); } }); } else { // re-subscribe anyways after the method invocation - subscribe(remoteInterface, requestQueue); + subscribe(remoteInterface, requestQueue, executor); } } diff --git a/src/main/java/org/redisson/api/RRemoteService.java b/src/main/java/org/redisson/api/RRemoteService.java index 76e7bd28a..06b17356a 100644 --- a/src/main/java/org/redisson/api/RRemoteService.java +++ b/src/main/java/org/redisson/api/RRemoteService.java @@ -15,6 +15,7 @@ */ package org.redisson.api; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; /** @@ -58,7 +59,7 @@ import java.util.concurrent.TimeUnit; public interface RRemoteService { /** - * Register remote service with single executor + * Register remote service with single worker * * @param remoteInterface * @param object @@ -66,13 +67,23 @@ public interface RRemoteService { void register(Class remoteInterface, T object); /** - * Register remote service with custom executors amount + * Register remote service with custom workers amount * * @param remoteInterface * @param object - * @param executorsAmount + * @param workersAmount */ - void register(Class remoteInterface, T object, int executorsAmount); + void register(Class remoteInterface, T object, int workersAmount); + + /** + * Register remote service with custom workers amount + * and executor for running them + * + * @param remoteInterface + * @param object + * @param workersAmount + */ + void register(Class remoteInterface, T object, int workersAmount, Executor executor); /** * Get remote service object for remote invocations. diff --git a/src/test/java/org/redisson/RedissonRemoteServiceTest.java b/src/test/java/org/redisson/RedissonRemoteServiceTest.java index 6f5a74154..4c8b08b51 100644 --- a/src/test/java/org/redisson/RedissonRemoteServiceTest.java +++ b/src/test/java/org/redisson/RedissonRemoteServiceTest.java @@ -1,6 +1,18 @@ package org.redisson; -import io.netty.handler.codec.EncoderException; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.io.NotSerializableException; +import java.io.Serializable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + import org.junit.Assert; import org.junit.Test; import org.redisson.api.RemoteInvocationOptions; @@ -10,15 +22,7 @@ import org.redisson.remote.RRemoteAsync; import org.redisson.remote.RemoteServiceAckTimeoutException; import org.redisson.remote.RemoteServiceTimeoutException; -import java.io.IOException; -import java.io.NotSerializableException; -import java.io.Serializable; import io.netty.util.concurrent.Future; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.assertj.core.api.Assertions.assertThat; public class RedissonRemoteServiceTest extends BaseTest { @@ -183,6 +187,29 @@ public class RedissonRemoteServiceTest extends BaseTest { r1.shutdown(); r2.shutdown(); } + + @Test + public void testExecutorAsync() throws InterruptedException { + RedissonClient r1 = createInstance(); + ExecutorService executor = Executors.newSingleThreadExecutor(); + r1.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl(), 1, executor); + + RedissonClient r2 = createInstance(); + RemoteInterfaceAsync ri = r2.getRemoteSerivce().get(RemoteInterfaceAsync.class); + + Future f = ri.voidMethod("someName", 100L); + f.sync(); + Future resFuture = ri.resultMethod(100L); + resFuture.sync(); + assertThat(resFuture.getNow()).isEqualTo(200); + + r1.shutdown(); + r2.shutdown(); + + executor.shutdown(); + executor.awaitTermination(1, TimeUnit.MINUTES); + } + @Test public void testExecutorsAmountConcurrency() throws InterruptedException {