RRemoteService.register method with custom executors amount added. #434

pull/499/head
Nikita 9 years ago
parent 43e632bfdf
commit 58472c6cf9

@ -51,21 +51,31 @@ public class RedissonRemoteService implements RRemoteService {
} }
@Override @Override
public <T> void register(Class<T> serviceInterface, T object) { public <T> void register(Class<T> remoteInterface, T object) {
for (Method method : serviceInterface.getMethods()) { register(remoteInterface, object, 1);
}
@Override
public <T> void register(Class<T> remoteInterface, T object, int executorsAmount) {
if (executorsAmount < 1) {
throw new IllegalArgumentException("executorsAmount can't be lower than 1");
}
for (Method method : remoteInterface.getMethods()) {
RemoteServiceMethod value = new RemoteServiceMethod(method, object); RemoteServiceMethod value = new RemoteServiceMethod(method, object);
RemoteServiceKey key = new RemoteServiceKey(serviceInterface, method.getName()); RemoteServiceKey key = new RemoteServiceKey(remoteInterface, method.getName());
if (beans.put(key, value) != null) { if (beans.put(key, value) != null) {
return; return;
} }
} }
String requestQueueName = "redisson_remote_service:{" + serviceInterface.getName() + "}"; for (int i = 0; i < executorsAmount; i++) {
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName); String requestQueueName = "redisson_remote_service:{" + remoteInterface.getName() + "}";
subscribe(serviceInterface, requestQueue); RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName);
subscribe(remoteInterface, requestQueue);
}
} }
private <T> void subscribe(final Class<T> serviceInterface, final RBlockingQueue<RemoteServiceRequest> requestQueue) { private <T> void subscribe(final Class<T> remoteInterface, final RBlockingQueue<RemoteServiceRequest> requestQueue) {
Future<RemoteServiceRequest> take = requestQueue.takeAsync(); Future<RemoteServiceRequest> take = requestQueue.takeAsync();
futures.add(take); futures.add(take);
take.addListener(new FutureListener<RemoteServiceRequest>() { take.addListener(new FutureListener<RemoteServiceRequest>() {
@ -76,8 +86,8 @@ public class RedissonRemoteService implements RRemoteService {
} }
RemoteServiceRequest request = future.getNow(); RemoteServiceRequest request = future.getNow();
RemoteServiceMethod method = beans.get(new RemoteServiceKey(serviceInterface, request.getMethodName())); RemoteServiceMethod method = beans.get(new RemoteServiceKey(remoteInterface, request.getMethodName()));
String responseName = "redisson_remote_service:{" + serviceInterface.getName() + "}:" + request.getRequestId(); String responseName = "redisson_remote_service:{" + remoteInterface.getName() + "}:" + request.getRequestId();
RTopic<RemoteServiceResponse> topic = redisson.getTopic(responseName); RTopic<RemoteServiceResponse> topic = redisson.getTopic(responseName);
RemoteServiceResponse response; RemoteServiceResponse response;
try { try {
@ -94,23 +104,23 @@ public class RedissonRemoteService implements RRemoteService {
} }
futures.remove(future); futures.remove(future);
subscribe(serviceInterface, requestQueue); subscribe(remoteInterface, requestQueue);
} }
}); });
} }
@Override @Override
public <T> T get(final Class<T> serviceInterface) { public <T> T get(final Class<T> remoteInterface) {
InvocationHandler handler = new InvocationHandler() { InvocationHandler handler = new InvocationHandler() {
@Override @Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String requestId = generateRequestId(); String requestId = generateRequestId();
String requestQueueName = "redisson_remote_service:{" + serviceInterface.getName() + "}"; String requestQueueName = "redisson_remote_service:{" + remoteInterface.getName() + "}";
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName); RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName);
requestQueue.add(new RemoteServiceRequest(requestId, method.getName(), args)); requestQueue.add(new RemoteServiceRequest(requestId, method.getName(), args));
String responseName = "redisson_remote_service:{" + serviceInterface.getName() + "}:" + requestId; String responseName = "redisson_remote_service:{" + remoteInterface.getName() + "}:" + requestId;
final RTopic<RemoteServiceResponse> topic = redisson.getTopic(responseName); final RTopic<RemoteServiceResponse> topic = redisson.getTopic(responseName);
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<RemoteServiceResponse> response = new AtomicReference<RemoteServiceResponse>(); final AtomicReference<RemoteServiceResponse> response = new AtomicReference<RemoteServiceResponse>();
@ -131,7 +141,7 @@ public class RedissonRemoteService implements RRemoteService {
return msg.getResult(); return msg.getResult();
} }
}; };
return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class[] {serviceInterface}, handler); return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[] {remoteInterface}, handler);
} }
private String generateRequestId() { private String generateRequestId() {

@ -18,13 +18,22 @@ package org.redisson.core;
public interface RRemoteService { public interface RRemoteService {
/** /**
* Register object as remote service * Register remote service with single executor
* *
* @param remoteInterface * @param remoteInterface
* @param object * @param object
*/ */
<T> void register(Class<T> remoteInterface, T object); <T> void register(Class<T> remoteInterface, T object);
/**
* Register remote service with custom executors amount
*
* @param remoteInterface
* @param object
* @param executorsAmount
*/
<T> void register(Class<T> remoteInterface, T object, int executorsAmount);
/** /**
* Get remote service object for remote invocations * Get remote service object for remote invocations
* *

Loading…
Cancel
Save