diff --git a/src/main/java/org/redisson/RedissonRemoteService.java b/src/main/java/org/redisson/RedissonRemoteService.java index 0beebb71f..55455db33 100644 --- a/src/main/java/org/redisson/RedissonRemoteService.java +++ b/src/main/java/org/redisson/RedissonRemoteService.java @@ -51,21 +51,31 @@ public class RedissonRemoteService implements RRemoteService { } @Override - public void register(Class serviceInterface, T object) { - for (Method method : serviceInterface.getMethods()) { + public void register(Class remoteInterface, T object) { + register(remoteInterface, object, 1); + } + + @Override + public void register(Class remoteInterface, T object, int executorsAmount) { + if (executorsAmount < 1) { + throw new IllegalArgumentException("executorsAmount can't be lower than 1"); + } + for (Method method : remoteInterface.getMethods()) { RemoteServiceMethod value = new RemoteServiceMethod(method, object); - RemoteServiceKey key = new RemoteServiceKey(serviceInterface, method.getName()); + RemoteServiceKey key = new RemoteServiceKey(remoteInterface, method.getName()); if (beans.put(key, value) != null) { return; } } - - String requestQueueName = "redisson_remote_service:{" + serviceInterface.getName() + "}"; - RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName); - subscribe(serviceInterface, requestQueue); + + for (int i = 0; i < executorsAmount; i++) { + String requestQueueName = "redisson_remote_service:{" + remoteInterface.getName() + "}"; + RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName); + subscribe(remoteInterface, requestQueue); + } } - private void subscribe(final Class serviceInterface, final RBlockingQueue requestQueue) { + private void subscribe(final Class remoteInterface, final RBlockingQueue requestQueue) { Future take = requestQueue.takeAsync(); futures.add(take); take.addListener(new FutureListener() { @@ -76,8 +86,8 @@ public class RedissonRemoteService implements RRemoteService { } RemoteServiceRequest request = future.getNow(); - RemoteServiceMethod method = beans.get(new RemoteServiceKey(serviceInterface, request.getMethodName())); - String responseName = "redisson_remote_service:{" + serviceInterface.getName() + "}:" + request.getRequestId(); + RemoteServiceMethod method = beans.get(new RemoteServiceKey(remoteInterface, request.getMethodName())); + String responseName = "redisson_remote_service:{" + remoteInterface.getName() + "}:" + request.getRequestId(); RTopic topic = redisson.getTopic(responseName); RemoteServiceResponse response; try { @@ -94,23 +104,23 @@ public class RedissonRemoteService implements RRemoteService { } futures.remove(future); - subscribe(serviceInterface, requestQueue); + subscribe(remoteInterface, requestQueue); } }); } @Override - public T get(final Class serviceInterface) { + public T get(final Class remoteInterface) { InvocationHandler handler = new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String requestId = generateRequestId(); - String requestQueueName = "redisson_remote_service:{" + serviceInterface.getName() + "}"; + String requestQueueName = "redisson_remote_service:{" + remoteInterface.getName() + "}"; RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName); requestQueue.add(new RemoteServiceRequest(requestId, method.getName(), args)); - String responseName = "redisson_remote_service:{" + serviceInterface.getName() + "}:" + requestId; + String responseName = "redisson_remote_service:{" + remoteInterface.getName() + "}:" + requestId; final RTopic topic = redisson.getTopic(responseName); final CountDownLatch latch = new CountDownLatch(1); final AtomicReference response = new AtomicReference(); @@ -131,7 +141,7 @@ public class RedissonRemoteService implements RRemoteService { return msg.getResult(); } }; - return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class[] {serviceInterface}, handler); + return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[] {remoteInterface}, handler); } private String generateRequestId() { diff --git a/src/main/java/org/redisson/core/RRemoteService.java b/src/main/java/org/redisson/core/RRemoteService.java index f7066dc9d..c322c512b 100644 --- a/src/main/java/org/redisson/core/RRemoteService.java +++ b/src/main/java/org/redisson/core/RRemoteService.java @@ -18,13 +18,22 @@ package org.redisson.core; public interface RRemoteService { /** - * Register object as remote service + * Register remote service with single executor * * @param remoteInterface * @param object */ void register(Class remoteInterface, T object); + /** + * Register remote service with custom executors amount + * + * @param remoteInterface + * @param object + * @param executorsAmount + */ + void register(Class remoteInterface, T object, int executorsAmount); + /** * Get remote service object for remote invocations *