|
|
|
@ -150,7 +150,7 @@ public class RedissonRemoteService implements RRemoteService {
|
|
|
|
|
|
|
|
|
|
// send the ack only if expected
|
|
|
|
|
if (request.getOptions().isAckExpected()) {
|
|
|
|
|
String ackName = name + ":{" + remoteInterface.getName() + "}:ack";
|
|
|
|
|
String ackName = getAckName(remoteInterface.getName(), request.getRequestId());
|
|
|
|
|
Future<Boolean> ackClientsFuture = redisson.getScript().evalAsync(responseName, Mode.READ_WRITE, LongCodec.INSTANCE,
|
|
|
|
|
"if redis.call('setnx', KEYS[1], 1) == 1 then "
|
|
|
|
|
+ "redis.call('pexpire', KEYS[1], ARGV[2]);"
|
|
|
|
@ -159,9 +159,9 @@ public class RedissonRemoteService implements RRemoteService {
|
|
|
|
|
+ "return 1;"
|
|
|
|
|
+ "end;"
|
|
|
|
|
+ "return 0;", RScript.ReturnType.BOOLEAN, Arrays.<Object>asList(ackName, responseName),
|
|
|
|
|
getCodec().getValueEncoder().encode(new RemoteServiceAck()), request.getOptions().getAckTimeoutInMillis());
|
|
|
|
|
// Future<List<?>> ackClientsFuture = send(request.getOptions().getAckTimeoutInMillis(), responseName, new RemoteServiceAck());
|
|
|
|
|
// ackClientsFuture.addListener(new FutureListener<List<?>>() {
|
|
|
|
|
getCodec().getValueEncoder().encode(new RemoteServiceAck()),
|
|
|
|
|
request.getOptions().getAckTimeoutInMillis());
|
|
|
|
|
|
|
|
|
|
ackClientsFuture.addListener(new FutureListener<Boolean>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<Boolean> future) throws Exception {
|
|
|
|
@ -267,7 +267,7 @@ public class RedissonRemoteService implements RRemoteService {
|
|
|
|
|
return sync(remoteInterface, options);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private <T> T async(Class<T> remoteInterface, final RemoteInvocationOptions options, final String interfaceName) {
|
|
|
|
|
private <T> T async(final Class<T> remoteInterface, final RemoteInvocationOptions options, final String interfaceName) {
|
|
|
|
|
// local copy of the options, to prevent mutation
|
|
|
|
|
final RemoteInvocationOptions optionsCopy = new RemoteInvocationOptions(options);
|
|
|
|
|
final String toString = getClass().getSimpleName() + "-" + remoteInterface.getSimpleName() + "-proxy-" + generateRequestId();
|
|
|
|
@ -282,8 +282,12 @@ public class RedissonRemoteService implements RRemoteService {
|
|
|
|
|
return toString.hashCode();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!optionsCopy.isResultExpected() && !(method.getReturnType().equals(Void.class) || method.getReturnType().equals(Void.TYPE)))
|
|
|
|
|
if (!optionsCopy.isResultExpected()
|
|
|
|
|
&& !(method.getReturnType().equals(Void.class)
|
|
|
|
|
|| method.getReturnType().equals(Void.TYPE)
|
|
|
|
|
|| method.getReturnType().equals(Future.class))) {
|
|
|
|
|
throw new IllegalArgumentException("The noResult option only supports void return value");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
final String requestId = generateRequestId();
|
|
|
|
|
final Promise<Object> result = ImmediateEventExecutor.INSTANCE.newPromise();
|
|
|
|
@ -310,19 +314,19 @@ public class RedissonRemoteService implements RRemoteService {
|
|
|
|
|
responseQueue = null;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// poll for the ack only if expected
|
|
|
|
|
if (optionsCopy.isAckExpected()) {
|
|
|
|
|
final String ackName = name + ":{" + interfaceName + "}:ack";
|
|
|
|
|
Future<RemoteServiceAck> ackFuture = (Future<RemoteServiceAck>) responseQueue.pollAsync(optionsCopy.getAckTimeoutInMillis(), TimeUnit.MILLISECONDS);
|
|
|
|
|
ackFuture.addListener(new FutureListener<RemoteServiceAck>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<RemoteServiceAck> future) throws Exception {
|
|
|
|
|
if (!future.isSuccess()) {
|
|
|
|
|
result.setFailure(future.cause());
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
RemoteServiceAck ack = future.getNow();
|
|
|
|
|
if (ack == null) {
|
|
|
|
|
final String ackName = getAckName(remoteInterface.getName(), request.getRequestId());
|
|
|
|
|
Future<RemoteServiceAck> ackFutureAttempt = tryPollAckAgainAsync(optionsCopy, responseQueue, ackName);
|
|
|
|
|
ackFutureAttempt.addListener(new FutureListener<RemoteServiceAck>() {
|
|
|
|
|
|
|
|
|
@ -343,9 +347,25 @@ public class RedissonRemoteService implements RRemoteService {
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
} else {
|
|
|
|
|
invokeAsync(optionsCopy, result, request, responseQueue, ackName);
|
|
|
|
|
invokeAsync(optionsCopy, result, request, responseQueue);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
});
|
|
|
|
|
} else {
|
|
|
|
|
if (optionsCopy.isResultExpected()) {
|
|
|
|
|
invokeAsync(optionsCopy, result, request, responseQueue);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
return result;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
};
|
|
|
|
|
return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[]{remoteInterface}, handler);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void invokeAsync(final RemoteInvocationOptions optionsCopy,
|
|
|
|
|
final Promise<Object> result, final RemoteServiceRequest request,
|
|
|
|
@ -360,6 +380,14 @@ public class RedissonRemoteService implements RRemoteService {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
invokeAsync(optionsCopy, result, request, responseQueue);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void invokeAsync(final RemoteInvocationOptions optionsCopy, final Promise<Object> result,
|
|
|
|
|
final RemoteServiceRequest request,
|
|
|
|
|
final RBlockingQueue<? extends RRemoteServiceResponse> responseQueue) {
|
|
|
|
|
// poll for the response only if expected
|
|
|
|
|
if (optionsCopy.isResultExpected()) {
|
|
|
|
|
Future<RemoteServiceResponse> responseFuture = (Future<RemoteServiceResponse>) responseQueue.pollAsync(optionsCopy.getExecutionTimeoutInMillis(), TimeUnit.MILLISECONDS);
|
|
|
|
@ -387,23 +415,8 @@ public class RedissonRemoteService implements RRemoteService {
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
return result;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
};
|
|
|
|
|
return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[]{remoteInterface}, handler);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private <T> T sync(Class<T> remoteInterface, final RemoteInvocationOptions options) {
|
|
|
|
|
final String interfaceName = remoteInterface.getName();
|
|
|
|
|
// local copy of the options, to prevent mutation
|
|
|
|
@ -439,7 +452,7 @@ public class RedissonRemoteService implements RRemoteService {
|
|
|
|
|
|
|
|
|
|
// poll for the ack only if expected
|
|
|
|
|
if (optionsCopy.isAckExpected()) {
|
|
|
|
|
String ackName = name + ":{" + interfaceName + "}:ack";
|
|
|
|
|
String ackName = getAckName(interfaceName, requestId);
|
|
|
|
|
RemoteServiceAck ack = (RemoteServiceAck) responseQueue.poll(optionsCopy.getAckTimeoutInMillis(), TimeUnit.MILLISECONDS);
|
|
|
|
|
if (ack == null) {
|
|
|
|
|
ack = tryPollAckAgain(optionsCopy, responseQueue, ackName);
|
|
|
|
@ -469,6 +482,10 @@ public class RedissonRemoteService implements RRemoteService {
|
|
|
|
|
return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[]{remoteInterface}, handler);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private String getAckName(String interfaceName, String requestId) {
|
|
|
|
|
return name + ":{" + interfaceName + "}:" + requestId + ":ack";
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private RemoteServiceAck tryPollAckAgain(RemoteInvocationOptions optionsCopy,
|
|
|
|
|
RBlockingQueue<? extends RRemoteServiceResponse> responseQueue, String ackName) throws InterruptedException {
|
|
|
|
|
Future<Boolean> ackClientsFuture = redisson.getScript().evalAsync(ackName, Mode.READ_WRITE, LongCodec.INSTANCE,
|
|
|
|
@ -477,7 +494,8 @@ public class RedissonRemoteService implements RRemoteService {
|
|
|
|
|
+ "return 0;"
|
|
|
|
|
+ "end;"
|
|
|
|
|
+ "redis.call('del', KEYS[1]);"
|
|
|
|
|
+ "return 1;", RScript.ReturnType.BOOLEAN, Arrays.<Object>asList(ackName), optionsCopy.getAckTimeoutInMillis());
|
|
|
|
|
+ "return 1;",
|
|
|
|
|
RScript.ReturnType.BOOLEAN, Arrays.<Object>asList(ackName), optionsCopy.getAckTimeoutInMillis());
|
|
|
|
|
|
|
|
|
|
ackClientsFuture.sync();
|
|
|
|
|
if (ackClientsFuture.getNow()) {
|
|
|
|
@ -491,8 +509,12 @@ public class RedissonRemoteService implements RRemoteService {
|
|
|
|
|
throws InterruptedException {
|
|
|
|
|
final Promise<RemoteServiceAck> promise = ImmediateEventExecutor.INSTANCE.newPromise();
|
|
|
|
|
Future<Boolean> ackClientsFuture = redisson.getScript().evalAsync(ackName, Mode.READ_WRITE, LongCodec.INSTANCE,
|
|
|
|
|
"if redis.call('setnx', KEYS[1], 1) == 1 then " + "redis.call('pexpire', KEYS[1], ARGV[1]);"
|
|
|
|
|
+ "return 0;" + "end;" + "redis.call('del', KEYS[1]);" + "return 1;",
|
|
|
|
|
"if redis.call('setnx', KEYS[1], 1) == 1 then "
|
|
|
|
|
+ "redis.call('pexpire', KEYS[1], ARGV[1]);"
|
|
|
|
|
+ "return 0;"
|
|
|
|
|
+ "end;"
|
|
|
|
|
+ "redis.call('del', KEYS[1]);"
|
|
|
|
|
+ "return 1;",
|
|
|
|
|
RScript.ReturnType.BOOLEAN, Arrays.<Object> asList(ackName), optionsCopy.getAckTimeoutInMillis());
|
|
|
|
|
ackClientsFuture.addListener(new FutureListener<Boolean>() {
|
|
|
|
|
@Override
|
|
|
|
|