|
|
|
@ -191,8 +191,7 @@ public class RedissonRemoteService implements RRemoteService {
|
|
|
|
|
|
|
|
|
|
public <T> T get(final Class<T> remoteInterface, RemoteInvocationOptions options) {
|
|
|
|
|
// local copy of the options, to prevent mutation
|
|
|
|
|
final long ackTimeoutInMillis = options.isAckExpected() ? options.getAckTimeoutInMillis() : -1;
|
|
|
|
|
final long executionTimeoutInMillis = options.isResultExpected() ? options.getExecutionTimeoutInMillis() : -1;
|
|
|
|
|
final RemoteInvocationOptions optionsCopy = new RemoteInvocationOptions(options);
|
|
|
|
|
final String toString = getClass().getSimpleName() + "-" + remoteInterface.getSimpleName() + "-proxy-" + generateRequestId();
|
|
|
|
|
InvocationHandler handler = new InvocationHandler() {
|
|
|
|
|
@Override
|
|
|
|
@ -205,36 +204,40 @@ public class RedissonRemoteService implements RRemoteService {
|
|
|
|
|
return toString.hashCode();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (executionTimeoutInMillis < 0 && !(method.getReturnType().equals(Void.class) || method.getReturnType().equals(Void.TYPE)))
|
|
|
|
|
if (!optionsCopy.isResultExpected() && !(method.getReturnType().equals(Void.class) || method.getReturnType().equals(Void.TYPE)))
|
|
|
|
|
throw new IllegalArgumentException("The noResult option only supports void return value");
|
|
|
|
|
|
|
|
|
|
String requestId = generateRequestId();
|
|
|
|
|
|
|
|
|
|
String requestQueueName = name + ":{" + remoteInterface.getName() + "}";
|
|
|
|
|
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName);
|
|
|
|
|
RemoteServiceRequest request = new RemoteServiceRequest(requestId, method.getName(), args,
|
|
|
|
|
ackTimeoutInMillis, executionTimeoutInMillis, System.currentTimeMillis());
|
|
|
|
|
RemoteServiceRequest request = new RemoteServiceRequest(requestId,
|
|
|
|
|
method.getName(),
|
|
|
|
|
args,
|
|
|
|
|
optionsCopy.isAckExpected() ? optionsCopy.getAckTimeoutInMillis() : -1,
|
|
|
|
|
optionsCopy.isResultExpected() ? optionsCopy.getExecutionTimeoutInMillis() : -1,
|
|
|
|
|
System.currentTimeMillis());
|
|
|
|
|
requestQueue.add(request);
|
|
|
|
|
|
|
|
|
|
RBlockingQueue<RRemoteServiceResponse> responseQueue = null;
|
|
|
|
|
if (ackTimeoutInMillis >= 0 || executionTimeoutInMillis >= 0) {
|
|
|
|
|
if (optionsCopy.isAckExpected() || optionsCopy.isResultExpected()) {
|
|
|
|
|
String responseName = name + ":{" + remoteInterface.getName() + "}:" + requestId;
|
|
|
|
|
responseQueue = redisson.getBlockingQueue(responseName);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// negative ackTimeout means unacknowledged call, do not poll for the ack
|
|
|
|
|
if (ackTimeoutInMillis >= 0) {
|
|
|
|
|
RemoteServiceAck ack = (RemoteServiceAck) responseQueue.poll(ackTimeoutInMillis, TimeUnit.MILLISECONDS);
|
|
|
|
|
// poll for the ack only if expected
|
|
|
|
|
if (optionsCopy.isAckExpected()) {
|
|
|
|
|
RemoteServiceAck ack = (RemoteServiceAck) responseQueue.poll(optionsCopy.getAckTimeoutInMillis(), TimeUnit.MILLISECONDS);
|
|
|
|
|
if (ack == null) {
|
|
|
|
|
throw new RemoteServiceAckTimeoutException("No ACK response after " + ackTimeoutInMillis + "ms for request: " + request);
|
|
|
|
|
throw new RemoteServiceAckTimeoutException("No ACK response after " + optionsCopy.getAckTimeoutInMillis() + "ms for request: " + request);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// negative executionTimeout means fire-and-forget call, do not poll for the response
|
|
|
|
|
if (executionTimeoutInMillis >= 0) {
|
|
|
|
|
RemoteServiceResponse response = (RemoteServiceResponse) responseQueue.poll(executionTimeoutInMillis, TimeUnit.MILLISECONDS);
|
|
|
|
|
// poll for the response only if expected
|
|
|
|
|
if (optionsCopy.isResultExpected()) {
|
|
|
|
|
RemoteServiceResponse response = (RemoteServiceResponse) responseQueue.poll(optionsCopy.getExecutionTimeoutInMillis(), TimeUnit.MILLISECONDS);
|
|
|
|
|
if (response == null) {
|
|
|
|
|
throw new RemoteServiceTimeoutException("No response after " + executionTimeoutInMillis + "ms for request: " + request);
|
|
|
|
|
throw new RemoteServiceTimeoutException("No response after " + optionsCopy.getExecutionTimeoutInMillis() + "ms for request: " + request);
|
|
|
|
|
}
|
|
|
|
|
if (response.getError() != null) {
|
|
|
|
|
throw response.getError();
|
|
|
|
|