Ack object used to prevent race-condition during ack receiving should be created per request.

pull/566/head
Nikita 9 years ago
parent 5569ed4122
commit a3e8af5995

@ -150,7 +150,7 @@ public class RedissonRemoteService implements RRemoteService {
// send the ack only if expected // send the ack only if expected
if (request.getOptions().isAckExpected()) { if (request.getOptions().isAckExpected()) {
String ackName = name + ":{" + remoteInterface.getName() + "}:ack"; String ackName = getAckName(remoteInterface.getName(), request.getRequestId());
Future<Boolean> ackClientsFuture = redisson.getScript().evalAsync(responseName, Mode.READ_WRITE, LongCodec.INSTANCE, Future<Boolean> ackClientsFuture = redisson.getScript().evalAsync(responseName, Mode.READ_WRITE, LongCodec.INSTANCE,
"if redis.call('setnx', KEYS[1], 1) == 1 then " "if redis.call('setnx', KEYS[1], 1) == 1 then "
+ "redis.call('pexpire', KEYS[1], ARGV[2]);" + "redis.call('pexpire', KEYS[1], ARGV[2]);"
@ -159,9 +159,9 @@ public class RedissonRemoteService implements RRemoteService {
+ "return 1;" + "return 1;"
+ "end;" + "end;"
+ "return 0;", RScript.ReturnType.BOOLEAN, Arrays.<Object>asList(ackName, responseName), + "return 0;", RScript.ReturnType.BOOLEAN, Arrays.<Object>asList(ackName, responseName),
getCodec().getValueEncoder().encode(new RemoteServiceAck()), request.getOptions().getAckTimeoutInMillis()); getCodec().getValueEncoder().encode(new RemoteServiceAck()),
// Future<List<?>> ackClientsFuture = send(request.getOptions().getAckTimeoutInMillis(), responseName, new RemoteServiceAck()); request.getOptions().getAckTimeoutInMillis());
// ackClientsFuture.addListener(new FutureListener<List<?>>() {
ackClientsFuture.addListener(new FutureListener<Boolean>() { ackClientsFuture.addListener(new FutureListener<Boolean>() {
@Override @Override
public void operationComplete(Future<Boolean> future) throws Exception { public void operationComplete(Future<Boolean> future) throws Exception {
@ -267,7 +267,7 @@ public class RedissonRemoteService implements RRemoteService {
return sync(remoteInterface, options); return sync(remoteInterface, options);
} }
private <T> T async(Class<T> remoteInterface, final RemoteInvocationOptions options, final String interfaceName) { private <T> T async(final Class<T> remoteInterface, final RemoteInvocationOptions options, final String interfaceName) {
// local copy of the options, to prevent mutation // local copy of the options, to prevent mutation
final RemoteInvocationOptions optionsCopy = new RemoteInvocationOptions(options); final RemoteInvocationOptions optionsCopy = new RemoteInvocationOptions(options);
final String toString = getClass().getSimpleName() + "-" + remoteInterface.getSimpleName() + "-proxy-" + generateRequestId(); final String toString = getClass().getSimpleName() + "-" + remoteInterface.getSimpleName() + "-proxy-" + generateRequestId();
@ -282,8 +282,12 @@ public class RedissonRemoteService implements RRemoteService {
return toString.hashCode(); return toString.hashCode();
} }
if (!optionsCopy.isResultExpected() && !(method.getReturnType().equals(Void.class) || method.getReturnType().equals(Void.TYPE))) if (!optionsCopy.isResultExpected()
&& !(method.getReturnType().equals(Void.class)
|| method.getReturnType().equals(Void.TYPE)
|| method.getReturnType().equals(Future.class))) {
throw new IllegalArgumentException("The noResult option only supports void return value"); throw new IllegalArgumentException("The noResult option only supports void return value");
}
final String requestId = generateRequestId(); final String requestId = generateRequestId();
final Promise<Object> result = ImmediateEventExecutor.INSTANCE.newPromise(); final Promise<Object> result = ImmediateEventExecutor.INSTANCE.newPromise();
@ -310,19 +314,19 @@ public class RedissonRemoteService implements RRemoteService {
responseQueue = null; responseQueue = null;
} }
// poll for the ack only if expected
if (optionsCopy.isAckExpected()) { if (optionsCopy.isAckExpected()) {
final String ackName = name + ":{" + interfaceName + "}:ack";
Future<RemoteServiceAck> ackFuture = (Future<RemoteServiceAck>) responseQueue.pollAsync(optionsCopy.getAckTimeoutInMillis(), TimeUnit.MILLISECONDS); Future<RemoteServiceAck> ackFuture = (Future<RemoteServiceAck>) responseQueue.pollAsync(optionsCopy.getAckTimeoutInMillis(), TimeUnit.MILLISECONDS);
ackFuture.addListener(new FutureListener<RemoteServiceAck>() { ackFuture.addListener(new FutureListener<RemoteServiceAck>() {
@Override @Override
public void operationComplete(Future<RemoteServiceAck> future) throws Exception { public void operationComplete(Future<RemoteServiceAck> future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
result.setFailure(future.cause());
return; return;
} }
RemoteServiceAck ack = future.getNow(); RemoteServiceAck ack = future.getNow();
if (ack == null) { if (ack == null) {
final String ackName = getAckName(remoteInterface.getName(), request.getRequestId());
Future<RemoteServiceAck> ackFutureAttempt = tryPollAckAgainAsync(optionsCopy, responseQueue, ackName); Future<RemoteServiceAck> ackFutureAttempt = tryPollAckAgainAsync(optionsCopy, responseQueue, ackName);
ackFutureAttempt.addListener(new FutureListener<RemoteServiceAck>() { ackFutureAttempt.addListener(new FutureListener<RemoteServiceAck>() {
@ -343,55 +347,15 @@ public class RedissonRemoteService implements RRemoteService {
} }
}); });
} else { } else {
invokeAsync(optionsCopy, result, request, responseQueue, ackName); invokeAsync(optionsCopy, result, request, responseQueue);
} }
} }
private void invokeAsync(final RemoteInvocationOptions optionsCopy,
final Promise<Object> result, final RemoteServiceRequest request,
final RBlockingQueue<? extends RRemoteServiceResponse> responseQueue,
final String ackName) {
Future<Boolean> deleteFuture = redisson.getBucket(ackName).deleteAsync();
deleteFuture.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
result.setFailure(future.cause());
return;
}
// poll for the response only if expected
if (optionsCopy.isResultExpected()) {
Future<RemoteServiceResponse> responseFuture = (Future<RemoteServiceResponse>) responseQueue.pollAsync(optionsCopy.getExecutionTimeoutInMillis(), TimeUnit.MILLISECONDS);
responseFuture.addListener(new FutureListener<RemoteServiceResponse>() {
@Override
public void operationComplete(Future<RemoteServiceResponse> future)
throws Exception {
if (!future.isSuccess()) {
result.setFailure(future.cause());
return;
}
if (future.getNow() == null) {
RemoteServiceTimeoutException e = new RemoteServiceTimeoutException("No response after " + optionsCopy.getExecutionTimeoutInMillis() + "ms for request: " + request);
result.setFailure(e);
return;
}
if (future.getNow().getError() != null) {
result.setFailure(future.getNow().getError());
}
result.setSuccess(future.getNow().getResult());
}
});
}
}
});
}
}); });
} else {
if (optionsCopy.isResultExpected()) {
invokeAsync(optionsCopy, result, request, responseQueue);
}
} }
} }
}); });
@ -403,6 +367,55 @@ public class RedissonRemoteService implements RRemoteService {
return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[]{remoteInterface}, handler); return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[]{remoteInterface}, handler);
} }
private void invokeAsync(final RemoteInvocationOptions optionsCopy,
final Promise<Object> result, final RemoteServiceRequest request,
final RBlockingQueue<? extends RRemoteServiceResponse> responseQueue,
final String ackName) {
Future<Boolean> deleteFuture = redisson.getBucket(ackName).deleteAsync();
deleteFuture.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
result.setFailure(future.cause());
return;
}
invokeAsync(optionsCopy, result, request, responseQueue);
}
});
}
private void invokeAsync(final RemoteInvocationOptions optionsCopy, final Promise<Object> result,
final RemoteServiceRequest request,
final RBlockingQueue<? extends RRemoteServiceResponse> responseQueue) {
// poll for the response only if expected
if (optionsCopy.isResultExpected()) {
Future<RemoteServiceResponse> responseFuture = (Future<RemoteServiceResponse>) responseQueue.pollAsync(optionsCopy.getExecutionTimeoutInMillis(), TimeUnit.MILLISECONDS);
responseFuture.addListener(new FutureListener<RemoteServiceResponse>() {
@Override
public void operationComplete(Future<RemoteServiceResponse> future)
throws Exception {
if (!future.isSuccess()) {
result.setFailure(future.cause());
return;
}
if (future.getNow() == null) {
RemoteServiceTimeoutException e = new RemoteServiceTimeoutException("No response after " + optionsCopy.getExecutionTimeoutInMillis() + "ms for request: " + request);
result.setFailure(e);
return;
}
if (future.getNow().getError() != null) {
result.setFailure(future.getNow().getError());
}
result.setSuccess(future.getNow().getResult());
}
});
}
}
private <T> T sync(Class<T> remoteInterface, final RemoteInvocationOptions options) { private <T> T sync(Class<T> remoteInterface, final RemoteInvocationOptions options) {
final String interfaceName = remoteInterface.getName(); final String interfaceName = remoteInterface.getName();
@ -439,7 +452,7 @@ public class RedissonRemoteService implements RRemoteService {
// poll for the ack only if expected // poll for the ack only if expected
if (optionsCopy.isAckExpected()) { if (optionsCopy.isAckExpected()) {
String ackName = name + ":{" + interfaceName + "}:ack"; String ackName = getAckName(interfaceName, requestId);
RemoteServiceAck ack = (RemoteServiceAck) responseQueue.poll(optionsCopy.getAckTimeoutInMillis(), TimeUnit.MILLISECONDS); RemoteServiceAck ack = (RemoteServiceAck) responseQueue.poll(optionsCopy.getAckTimeoutInMillis(), TimeUnit.MILLISECONDS);
if (ack == null) { if (ack == null) {
ack = tryPollAckAgain(optionsCopy, responseQueue, ackName); ack = tryPollAckAgain(optionsCopy, responseQueue, ackName);
@ -469,6 +482,10 @@ public class RedissonRemoteService implements RRemoteService {
return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[]{remoteInterface}, handler); return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[]{remoteInterface}, handler);
} }
private String getAckName(String interfaceName, String requestId) {
return name + ":{" + interfaceName + "}:" + requestId + ":ack";
}
private RemoteServiceAck tryPollAckAgain(RemoteInvocationOptions optionsCopy, private RemoteServiceAck tryPollAckAgain(RemoteInvocationOptions optionsCopy,
RBlockingQueue<? extends RRemoteServiceResponse> responseQueue, String ackName) throws InterruptedException { RBlockingQueue<? extends RRemoteServiceResponse> responseQueue, String ackName) throws InterruptedException {
Future<Boolean> ackClientsFuture = redisson.getScript().evalAsync(ackName, Mode.READ_WRITE, LongCodec.INSTANCE, Future<Boolean> ackClientsFuture = redisson.getScript().evalAsync(ackName, Mode.READ_WRITE, LongCodec.INSTANCE,
@ -477,7 +494,8 @@ public class RedissonRemoteService implements RRemoteService {
+ "return 0;" + "return 0;"
+ "end;" + "end;"
+ "redis.call('del', KEYS[1]);" + "redis.call('del', KEYS[1]);"
+ "return 1;", RScript.ReturnType.BOOLEAN, Arrays.<Object>asList(ackName), optionsCopy.getAckTimeoutInMillis()); + "return 1;",
RScript.ReturnType.BOOLEAN, Arrays.<Object>asList(ackName), optionsCopy.getAckTimeoutInMillis());
ackClientsFuture.sync(); ackClientsFuture.sync();
if (ackClientsFuture.getNow()) { if (ackClientsFuture.getNow()) {
@ -491,8 +509,12 @@ public class RedissonRemoteService implements RRemoteService {
throws InterruptedException { throws InterruptedException {
final Promise<RemoteServiceAck> promise = ImmediateEventExecutor.INSTANCE.newPromise(); final Promise<RemoteServiceAck> promise = ImmediateEventExecutor.INSTANCE.newPromise();
Future<Boolean> ackClientsFuture = redisson.getScript().evalAsync(ackName, Mode.READ_WRITE, LongCodec.INSTANCE, Future<Boolean> ackClientsFuture = redisson.getScript().evalAsync(ackName, Mode.READ_WRITE, LongCodec.INSTANCE,
"if redis.call('setnx', KEYS[1], 1) == 1 then " + "redis.call('pexpire', KEYS[1], ARGV[1]);" "if redis.call('setnx', KEYS[1], 1) == 1 then "
+ "return 0;" + "end;" + "redis.call('del', KEYS[1]);" + "return 1;", + "redis.call('pexpire', KEYS[1], ARGV[1]);"
+ "return 0;"
+ "end;"
+ "redis.call('del', KEYS[1]);"
+ "return 1;",
RScript.ReturnType.BOOLEAN, Arrays.<Object> asList(ackName), optionsCopy.getAckTimeoutInMillis()); RScript.ReturnType.BOOLEAN, Arrays.<Object> asList(ackName), optionsCopy.getAckTimeoutInMillis());
ackClientsFuture.addListener(new FutureListener<Boolean>() { ackClientsFuture.addListener(new FutureListener<Boolean>() {
@Override @Override

@ -69,6 +69,12 @@ public class RedissonRemoteServiceTest extends BaseTest {
Future<Long> resultMethod(Long value); Future<Long> resultMethod(Long value);
Future<Void> errorMethod();
Future<Void> errorMethodWithCause();
Future<Void> timeoutMethod();
} }
@RRemoteAsync(RemoteInterface.class) @RRemoteAsync(RemoteInterface.class)
@ -474,6 +480,47 @@ public class RedissonRemoteServiceTest extends BaseTest {
} }
} }
@Test
public void testNoAckWithResultInvocationsAsync() throws InterruptedException, ExecutionException {
RedissonClient server = createInstance();
RedissonClient client = createInstance();
try {
server.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl());
// no ack but an execution timeout of 1 second
RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.SECONDS);
RemoteInterfaceAsync service = client.getRemoteSerivce().get(RemoteInterfaceAsync.class, options);
service.voidMethod("noAck", 100L).get();
assertThat(service.resultMethod(21L).get()).isEqualTo(42);
try {
service.errorMethod().get();
Assert.fail();
} catch (Exception e) {
assertThat(e.getCause().getMessage()).isEqualTo("Checking error throw");
}
try {
service.errorMethodWithCause().get();
Assert.fail();
} catch (Exception e) {
assertThat(e.getCause().getCause()).isInstanceOf(ArithmeticException.class);
assertThat(e.getCause().getCause().getMessage()).isEqualTo("/ by zero");
}
try {
service.timeoutMethod().get();
Assert.fail("noAck option should still wait for the server to return a response and throw if the execution timeout is exceeded");
} catch (Exception e) {
assertThat(e.getCause()).isInstanceOf(RemoteServiceTimeoutException.class);
}
} finally {
client.shutdown();
server.shutdown();
}
}
@Test @Test
public void testAckWithoutResultInvocations() throws InterruptedException { public void testAckWithoutResultInvocations() throws InterruptedException {
RedissonClient server = createInstance(); RedissonClient server = createInstance();

Loading…
Cancel
Save