|
|
|
@ -111,11 +111,23 @@ public class RedissonRemoteService implements RRemoteService {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (int i = 0; i < executorsAmount; i++) {
|
|
|
|
|
String requestQueueName = name + ":{" + remoteInterface.getName() + "}";
|
|
|
|
|
String requestQueueName = getRequestQueueName(remoteInterface);
|
|
|
|
|
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName, getCodec());
|
|
|
|
|
subscribe(remoteInterface, requestQueue);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private String getAckName(Class<?> remoteInterface, String requestId) {
|
|
|
|
|
return "{" + name + ":" + remoteInterface.getName() + "}:" + requestId + ":ack";
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private String getResponseQueueName(Class<?> remoteInterface, String requestId) {
|
|
|
|
|
return "{" + name + ":" + remoteInterface.getName() + "}:" + requestId;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private String getRequestQueueName(Class<?> remoteInterface) {
|
|
|
|
|
return "{" + name + ":" + remoteInterface.getName() + "}";
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Codec getCodec() {
|
|
|
|
|
if (codec != null) {
|
|
|
|
@ -159,7 +171,7 @@ public class RedissonRemoteService implements RRemoteService {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
final RemoteServiceMethod method = beans.get(new RemoteServiceKey(remoteInterface, request.getMethodName()));
|
|
|
|
|
final String responseName = name + ":{" + remoteInterface.getName() + "}:" + request.getRequestId();
|
|
|
|
|
final String responseName = getResponseQueueName(remoteInterface, request.getRequestId());
|
|
|
|
|
|
|
|
|
|
// send the ack only if expected
|
|
|
|
|
if (request.getOptions().isAckExpected()) {
|
|
|
|
@ -306,8 +318,8 @@ public class RedissonRemoteService implements RRemoteService {
|
|
|
|
|
|
|
|
|
|
final String requestId = generateRequestId();
|
|
|
|
|
|
|
|
|
|
final String requestQueueName = name + ":{" + syncInterface.getName() + "}";
|
|
|
|
|
final String responseName = name + ":{" + syncInterface.getName() + "}:" + requestId;
|
|
|
|
|
final String requestQueueName = getRequestQueueName(syncInterface);
|
|
|
|
|
final String responseName = getResponseQueueName(syncInterface, requestId);
|
|
|
|
|
final String ackName = getAckName(syncInterface, requestId);
|
|
|
|
|
|
|
|
|
|
final RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName, getCodec());
|
|
|
|
@ -475,7 +487,7 @@ public class RedissonRemoteService implements RRemoteService {
|
|
|
|
|
|
|
|
|
|
String requestId = generateRequestId();
|
|
|
|
|
|
|
|
|
|
String requestQueueName = name + ":{" + interfaceName + "}";
|
|
|
|
|
String requestQueueName = getRequestQueueName(remoteInterface);
|
|
|
|
|
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName, getCodec());
|
|
|
|
|
RemoteServiceRequest request = new RemoteServiceRequest(requestId,
|
|
|
|
|
method.getName(), args, optionsCopy, System.currentTimeMillis());
|
|
|
|
@ -483,7 +495,7 @@ public class RedissonRemoteService implements RRemoteService {
|
|
|
|
|
|
|
|
|
|
RBlockingQueue<RRemoteServiceResponse> responseQueue = null;
|
|
|
|
|
if (optionsCopy.isAckExpected() || optionsCopy.isResultExpected()) {
|
|
|
|
|
String responseName = name + ":{" + interfaceName + "}:" + requestId;
|
|
|
|
|
String responseName = getResponseQueueName(remoteInterface, requestId);
|
|
|
|
|
responseQueue = redisson.getBlockingQueue(responseName, getCodec());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -519,10 +531,6 @@ public class RedissonRemoteService implements RRemoteService {
|
|
|
|
|
return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[]{remoteInterface}, handler);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private String getAckName(Class<?> remoteInterface, String requestId) {
|
|
|
|
|
return name + ":{" + remoteInterface.getName() + "}:" + requestId + ":ack";
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private RemoteServiceAck tryPollAckAgain(RemoteInvocationOptions optionsCopy,
|
|
|
|
|
RBlockingQueue<? extends RRemoteServiceResponse> responseQueue, String ackName) throws InterruptedException {
|
|
|
|
|
Future<Boolean> ackClientsFuture = redisson.getScript().evalAsync(ackName, Mode.READ_WRITE, LongCodec.INSTANCE,
|
|
|
|
|