proof of concept for unacknowledged and/or fire-and-forget calls in RRemoteService

pull/497/head
Pierre-David Bélanger 9 years ago
parent 7b85da1f21
commit b2e58d6a25

@ -110,7 +110,8 @@ public class RedissonRemoteService implements RRemoteService {
// subscribe(remoteInterface, requestQueue); // subscribe(remoteInterface, requestQueue);
final RemoteServiceRequest request = future.getNow(); final RemoteServiceRequest request = future.getNow();
if (System.currentTimeMillis() - request.getDate() > request.getAckTimeout()) { // negative ackTimeout means unacknowledged call, do not check the ack
if (request.getAckTimeout() >= 0 && System.currentTimeMillis() - request.getDate() > request.getAckTimeout()) {
log.debug("request: {} has been skipped due to ackTimeout"); log.debug("request: {} has been skipped due to ackTimeout");
// re-subscribe after a skipped ackTimeout // re-subscribe after a skipped ackTimeout
subscribe(remoteInterface, requestQueue); subscribe(remoteInterface, requestQueue);
@ -120,23 +121,28 @@ public class RedissonRemoteService implements RRemoteService {
final RemoteServiceMethod method = beans.get(new RemoteServiceKey(remoteInterface, request.getMethodName())); final RemoteServiceMethod method = beans.get(new RemoteServiceKey(remoteInterface, request.getMethodName()));
final String responseName = name + ":{" + remoteInterface.getName() + "}:" + request.getRequestId(); final String responseName = name + ":{" + remoteInterface.getName() + "}:" + request.getRequestId();
Future<List<?>> ackClientsFuture = send(request.getAckTimeout(), responseName, new RemoteServiceAck()); // negative ackTimeout means unacknowledged call, do not send the ack
ackClientsFuture.addListener(new FutureListener<List<?>>() { if (request.getAckTimeout() >= 0) {
@Override Future<List<?>> ackClientsFuture = send(request.getAckTimeout(), responseName, new RemoteServiceAck());
public void operationComplete(Future<List<?>> future) throws Exception { ackClientsFuture.addListener(new FutureListener<List<?>>() {
if (!future.isSuccess()) { @Override
log.error("Can't send ack for request: " + request, future.cause()); public void operationComplete(Future<List<?>> future) throws Exception {
if (future.cause() instanceof RedissonShutdownException) { if (!future.isSuccess()) {
log.error("Can't send ack for request: " + request, future.cause());
if (future.cause() instanceof RedissonShutdownException) {
return;
}
// re-subscribe after a failed send (ack)
subscribe(remoteInterface, requestQueue);
return; return;
} }
// re-subscribe after a failed send (ack)
subscribe(remoteInterface, requestQueue);
return;
}
invokeMethod(remoteInterface, requestQueue, request, method, responseName); invokeMethod(remoteInterface, requestQueue, request, method, responseName);
} }
}); });
} else {
invokeMethod(remoteInterface, requestQueue, request, method, responseName);
}
} }
}); });
@ -154,20 +160,26 @@ public class RedissonRemoteService implements RRemoteService {
log.error("Can't execute: " + request, e); log.error("Can't execute: " + request, e);
} }
Future<List<?>> clientsFuture = send(request.getResponseTimeout(), responseName, responseHolder.get()); // negative responseTimeout means fire-and-forget call, do not send the response
clientsFuture.addListener(new FutureListener<List<?>>() { if (request.getResponseTimeout() >= 0) {
@Override Future<List<?>> clientsFuture = send(request.getResponseTimeout(), responseName, responseHolder.get());
public void operationComplete(Future<List<?>> future) throws Exception { clientsFuture.addListener(new FutureListener<List<?>>() {
if (!future.isSuccess()) { @Override
log.error("Can't send response: " + responseHolder.get() + " for request: " + request, future.cause()); public void operationComplete(Future<List<?>> future) throws Exception {
if (future.cause() instanceof RedissonShutdownException) { if (!future.isSuccess()) {
return; log.error("Can't send response: " + responseHolder.get() + " for request: " + request, future.cause());
if (future.cause() instanceof RedissonShutdownException) {
return;
}
} }
// re-subscribe anyways (fail or success) after the send (response)
subscribe(remoteInterface, requestQueue);
} }
// re-subscribe anyways (fail or success) after the send (response) });
subscribe(remoteInterface, requestQueue); } else {
} // re-subscribe anyways after the method invocation
}); subscribe(remoteInterface, requestQueue);
}
} }
@Override @Override
@ -205,19 +217,27 @@ public class RedissonRemoteService implements RRemoteService {
String responseName = name + ":{" + remoteInterface.getName() + "}:" + requestId; String responseName = name + ":{" + remoteInterface.getName() + "}:" + requestId;
RBlockingQueue<RRemoteServiceResponse> responseQueue = redisson.getBlockingQueue(responseName); RBlockingQueue<RRemoteServiceResponse> responseQueue = redisson.getBlockingQueue(responseName);
RemoteServiceAck ack = (RemoteServiceAck) responseQueue.poll(ackTimeout, ackTimeUnit); // negative ackTimeout means unacknowledged call, do not poll for the ack
if (ack == null) { if (ackTimeout >= 0) {
throw new RemoteServiceAckTimeoutException("No ACK response after " + ackTimeUnit.toMillis(ackTimeout) + "ms for request: " + request); RemoteServiceAck ack = (RemoteServiceAck) responseQueue.poll(ackTimeout, ackTimeUnit);
if (ack == null) {
throw new RemoteServiceAckTimeoutException("No ACK response after " + ackTimeUnit.toMillis(ackTimeout) + "ms for request: " + request);
}
} }
RemoteServiceResponse response = (RemoteServiceResponse) responseQueue.poll(executionTimeout, executionTimeUnit); // negative executionTimeout means fire-and-forget call, do not poll for the response
if (response == null) { if (executionTimeout >= 0) {
throw new RemoteServiceTimeoutException("No response after " + executionTimeUnit.toMillis(executionTimeout) + "ms for request: " + request); RemoteServiceResponse response = (RemoteServiceResponse) responseQueue.poll(executionTimeout, executionTimeUnit);
} if (response == null) {
if (response.getError() != null) { throw new RemoteServiceTimeoutException("No response after " + executionTimeUnit.toMillis(executionTimeout) + "ms for request: " + request);
throw response.getError(); }
if (response.getError() != null) {
throw response.getError();
}
return response.getResult();
} }
return response.getResult();
return getDefaultValue(method.getReturnType());
} }
}; };
return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[] {remoteInterface}, handler); return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[] {remoteInterface}, handler);
@ -238,4 +258,32 @@ public class RedissonRemoteService implements RRemoteService {
return batch.executeAsync(); return batch.executeAsync();
} }
/**
* Horrible hack to get the default value for a Class
*
* @param type the Class
* @return the default value as
*/
private static Object getDefaultValue(Class type) {
if (!type.isPrimitive() || type.equals(void.class)) {
return null;
} else if (type.equals(boolean.class)) {
return false;
} else if (type.equals(byte.class)) {
return (byte) 0;
} else if (type.equals(char.class)) {
return (char) 0;
} else if (type.equals(short.class)) {
return (short) 0;
} else if (type.equals(int.class)) {
return (int) 0;
} else if (type.equals(long.class)) {
return (long) 0;
} else if (type.equals(float.class)) {
return (float) 0;
} else if (type.equals(double.class)) {
return (double) 0;
}
throw new IllegalArgumentException("Class " + type + " not supported");
}
} }

@ -91,6 +91,8 @@ public interface RRemoteService {
* with specified invocation timeout. * with specified invocation timeout.
* <p/> * <p/>
* Ack timeout = 1000 ms by default * Ack timeout = 1000 ms by default
* <p/>
* A negative executionTimeout will make fire-and-forget calls
* *
* @param remoteInterface * @param remoteInterface
* @param executionTimeout - invocation timeout * @param executionTimeout - invocation timeout
@ -102,6 +104,10 @@ public interface RRemoteService {
/** /**
* Get remote service object for remote invocations * Get remote service object for remote invocations
* with specified invocation and ack timeouts * with specified invocation and ack timeouts
* <p/>
* A negative executionTimeout will make fire-and-forget calls
* <p/>
* A negative ackTimeout will make unacknowledged calls
* *
* @param remoteInterface * @param remoteInterface
* @param executionTimeout - invocation timeout * @param executionTimeout - invocation timeout

@ -62,6 +62,8 @@ public class RedissonRemoteServiceTest extends BaseTest {
void voidMethod(String name, Long param); void voidMethod(String name, Long param);
int primitiveMethod();
Long resultMethod(Long value); Long resultMethod(Long value);
void errorMethod() throws IOException; void errorMethod() throws IOException;
@ -83,6 +85,11 @@ public class RedissonRemoteServiceTest extends BaseTest {
System.out.println(name + " " + param); System.out.println(name + " " + param);
} }
@Override
public int primitiveMethod() {
return 42;
}
@Override @Override
public Long resultMethod(Long value) { public Long resultMethod(Long value) {
return value*2; return value*2;
@ -362,4 +369,75 @@ public class RedissonRemoteServiceTest extends BaseTest {
server.shutdown(); server.shutdown();
} }
} }
@Test
public void testUnacknowledgedInvocations() throws InterruptedException {
RedissonClient r1 = Redisson.create();
r1.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl());
RedissonClient r2 = Redisson.create();
RemoteInterface ri = r2.getRemoteSerivce().get(RemoteInterface.class, 30, TimeUnit.SECONDS, -1, TimeUnit.SECONDS);
ri.voidMethod("someName", 100L);
assertThat(ri.resultMethod(100L)).isEqualTo(200);
assertThat(ri.primitiveMethod()).isEqualTo(42);
try {
ri.errorMethod();
Assert.fail();
} catch (IOException e) {
assertThat(e.getMessage()).isEqualTo("Checking error throw");
}
try {
ri.errorMethodWithCause();
Assert.fail();
} catch (Exception e) {
assertThat(e.getCause()).isInstanceOf(ArithmeticException.class);
assertThat(e.getCause().getMessage()).isEqualTo("/ by zero");
}
long time = System.currentTimeMillis();
ri.timeoutMethod();
time = System.currentTimeMillis() - time;
assertThat(time).describedAs("unacknowledged should still wait for the server to return a response").isGreaterThanOrEqualTo(2000);
r1.shutdown();
r2.shutdown();
}
@Test
public void testFireAndForgetInvocations() throws InterruptedException {
RedissonClient r1 = Redisson.create();
r1.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl());
RedissonClient r2 = Redisson.create();
RemoteInterface ri = r2.getRemoteSerivce().get(RemoteInterface.class, -1, TimeUnit.SECONDS);
ri.voidMethod("someName", 100L);
assertThat(ri.resultMethod(100L)).isNull();
assertThat(ri.primitiveMethod()).isEqualTo(0);
try {
ri.errorMethod();
} catch (IOException e) {
Assert.fail("fire-and-forget should not throw");
}
try {
ri.errorMethodWithCause();
} catch (Exception e) {
Assert.fail("fire-and-forget should not throw");
}
long time = System.currentTimeMillis();
ri.timeoutMethod();
time = System.currentTimeMillis() - time;
assertThat(time).describedAs("fire-and-forget should not wait for the server to return a response").isLessThan(2000);
r1.shutdown();
r2.shutdown();
}
} }

Loading…
Cancel
Save