|
|
|
@ -56,9 +56,15 @@ public class RedissonRemoteService implements RRemoteService {
|
|
|
|
|
private final Map<RemoteServiceKey, RemoteServiceMethod> beans = PlatformDependent.newConcurrentHashMap();
|
|
|
|
|
|
|
|
|
|
private final Redisson redisson;
|
|
|
|
|
private final String name;
|
|
|
|
|
|
|
|
|
|
public RedissonRemoteService(Redisson redisson) {
|
|
|
|
|
this(redisson, "redisson_remote_service");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public RedissonRemoteService(Redisson redisson, String name) {
|
|
|
|
|
this.redisson = redisson;
|
|
|
|
|
this.name = name;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -80,7 +86,7 @@ public class RedissonRemoteService implements RRemoteService {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (int i = 0; i < executorsAmount; i++) {
|
|
|
|
|
String requestQueueName = "redisson_remote_service:{" + remoteInterface.getName() + "}";
|
|
|
|
|
String requestQueueName = name + ":{" + remoteInterface.getName() + "}";
|
|
|
|
|
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName);
|
|
|
|
|
subscribe(remoteInterface, requestQueue);
|
|
|
|
|
}
|
|
|
|
@ -107,7 +113,7 @@ public class RedissonRemoteService implements RRemoteService {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
final RemoteServiceMethod method = beans.get(new RemoteServiceKey(remoteInterface, request.getMethodName()));
|
|
|
|
|
final String responseName = "redisson_remote_service:{" + remoteInterface.getName() + "}:" + request.getRequestId();
|
|
|
|
|
final String responseName = name + ":{" + remoteInterface.getName() + "}:" + request.getRequestId();
|
|
|
|
|
|
|
|
|
|
Future<List<?>> ackClientsFuture = send(request.getAckTimeout(), responseName, new RemoteServiceAck());
|
|
|
|
|
ackClientsFuture.addListener(new FutureListener<List<?>>() {
|
|
|
|
@ -167,13 +173,13 @@ public class RedissonRemoteService implements RRemoteService {
|
|
|
|
|
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
|
|
|
|
|
String requestId = generateRequestId();
|
|
|
|
|
|
|
|
|
|
String requestQueueName = "redisson_remote_service:{" + remoteInterface.getName() + "}";
|
|
|
|
|
String requestQueueName = name + ":{" + remoteInterface.getName() + "}";
|
|
|
|
|
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName);
|
|
|
|
|
RemoteServiceRequest request = new RemoteServiceRequest(requestId, method.getName(), args,
|
|
|
|
|
ackTimeUnit.toMillis(ackTimeout), executionTimeUnit.toMillis(executionTimeout), System.currentTimeMillis());
|
|
|
|
|
requestQueue.add(request);
|
|
|
|
|
|
|
|
|
|
String responseName = "redisson_remote_service:{" + remoteInterface.getName() + "}:" + requestId;
|
|
|
|
|
String responseName = name + ":{" + remoteInterface.getName() + "}:" + requestId;
|
|
|
|
|
RBlockingQueue<RRemoteServiceResponse> responseQueue = redisson.getBlockingQueue(responseName);
|
|
|
|
|
|
|
|
|
|
RemoteServiceAck ack = (RemoteServiceAck) responseQueue.poll(ackTimeout, ackTimeUnit);
|
|
|
|
|