From 52c223fdeab27b10bfc88c1c3f5df874508b2bc0 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Fri, 20 May 2016 14:32:36 +0300 Subject: [PATCH 01/15] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 9d87ecd05..8464eaaec 100644 --- a/README.md +++ b/README.md @@ -63,7 +63,7 @@ Features * [Spring cache](http://docs.spring.io/spring/docs/current/spring-framework-reference/html/cache.html) integration * Supports [Reactive Streams](http://www.reactive-streams.org) * Supports [Redis pipelining](http://redis.io/topics/pipelining) (command batches) -* Supports [Remote services](https://github.com/mrniko/redisson/wiki/5.-distributed-objects#513-remote-service) +* Supports [Remote services](https://github.com/mrniko/redisson/wiki/5.-distributed-objects#59-remote-service) * Supports Android platform * Supports auto-reconnect * Supports failed to send command auto-retry From 9793ce2c5642e0d17abb157acb3b722976577c6b Mon Sep 17 00:00:00 2001 From: Rui Gu Date: Fri, 20 May 2016 14:03:48 +0100 Subject: [PATCH 02/15] Fixed ClassCastException thrown from SortedSet add method --- src/main/java/org/redisson/RedissonList.java | 4 ++-- src/main/java/org/redisson/client/protocol/RedisCommands.java | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/redisson/RedissonList.java b/src/main/java/org/redisson/RedissonList.java index 5678e9d98..e8e46c490 100644 --- a/src/main/java/org/redisson/RedissonList.java +++ b/src/main/java/org/redisson/RedissonList.java @@ -568,12 +568,12 @@ public class RedissonList extends RedissonExpirable implements RList { @Override public Future addAfterAsync(V elementToFind, V element) { - return commandExecutor.writeAsync(getName(), codec, RedisCommands.LINSERT, getName(), "AFTER", elementToFind, element); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.LINSERT_INT, getName(), "AFTER", elementToFind, element); } @Override public Future addBeforeAsync(V elementToFind, V element) { - return commandExecutor.writeAsync(getName(), codec, RedisCommands.LINSERT, getName(), "BEFORE", elementToFind, element); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.LINSERT_INT, getName(), "BEFORE", elementToFind, element); } @Override diff --git a/src/main/java/org/redisson/client/protocol/RedisCommands.java b/src/main/java/org/redisson/client/protocol/RedisCommands.java index edd4b8424..d6252e221 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -143,7 +143,8 @@ public interface RedisCommands { RedisCommand LREM_SINGLE = new RedisCommand("LREM", new BooleanReplayConvertor(), 3); RedisStrictCommand LREM = new RedisStrictCommand("LREM", 3); RedisCommand LINDEX = new RedisCommand("LINDEX"); - RedisCommand LINSERT = new RedisCommand("LINSERT", new IntegerReplayConvertor(), 3, ValueType.OBJECTS); + RedisCommand LINSERT = new RedisCommand("LINSERT", 3, ValueType.OBJECTS); + RedisCommand LINSERT_INT = new RedisCommand("LINSERT", new IntegerReplayConvertor(), 3, ValueType.OBJECTS); RedisStrictCommand LLEN_INT = new RedisStrictCommand("LLEN", new IntegerReplayConvertor()); RedisStrictCommand LLEN = new RedisStrictCommand("LLEN"); RedisStrictCommand LTRIM = new RedisStrictCommand("LTRIM", new VoidReplayConvertor()); From 9f015ef0248fe9118a4a4b95f57fb0dd1165a40a Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Sat, 21 May 2016 13:41:38 +0300 Subject: [PATCH 03/15] Update README.md --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 8464eaaec..07f493e19 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,7 @@ Redis based In-Memory Data Grid for Java. Redisson. [![Maven Central](https://img.shields.io/maven-central/v/org.redisson/redisson.svg?style=flat-square)](https://maven-badges.herokuapp.com/maven-central/org.redisson/redisson/) ##Please take part in [Redisson survey](https://ru.surveymonkey.com/r/LP7RG8Q) +###Try [Redisson PRO](http://redisson.pro) edition Use familiar Java data structures with power of [Redis](http://redis.io). From 7ba4ae74e24173a7855564f73eb10b5b150f3827 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Sat, 21 May 2016 13:42:32 +0300 Subject: [PATCH 04/15] Update README.md --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 07f493e19..b3bbf8c41 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,6 @@ Redis based In-Memory Data Grid for Java. Redisson. [![Maven Central](https://img.shields.io/maven-central/v/org.redisson/redisson.svg?style=flat-square)](https://maven-badges.herokuapp.com/maven-central/org.redisson/redisson/) -##Please take part in [Redisson survey](https://ru.surveymonkey.com/r/LP7RG8Q) ###Try [Redisson PRO](http://redisson.pro) edition Use familiar Java data structures with power of [Redis](http://redis.io). @@ -20,6 +19,8 @@ Licensed under the Apache License 2.0. Welcome to support chat - [![Join the chat at https://gitter.im/mrniko/redisson](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/mrniko/redisson?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) +##Please take part in [Redisson survey](https://ru.surveymonkey.com/r/LP7RG8Q) + Features ================================ * [AWS ElastiCache](https://aws.amazon.com/elasticache/) servers mode: From aecf328f75b388002545271190d48fc9368e1389 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pierre-David=20Be=CC=81langer?= Date: Sat, 21 May 2016 20:27:33 -0400 Subject: [PATCH 05/15] not make a remote call when toString, equals and hashCode are called on the Remote Service client proxy --- .../org/redisson/RedissonRemoteService.java | 9 ++++++ .../redisson/RedissonRemoteServiceTest.java | 29 +++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/src/main/java/org/redisson/RedissonRemoteService.java b/src/main/java/org/redisson/RedissonRemoteService.java index c3431a3b8..0f1df58bd 100644 --- a/src/main/java/org/redisson/RedissonRemoteService.java +++ b/src/main/java/org/redisson/RedissonRemoteService.java @@ -182,9 +182,18 @@ public class RedissonRemoteService implements RRemoteService { public T get(final Class remoteInterface, final long executionTimeout, final TimeUnit executionTimeUnit, final long ackTimeout, final TimeUnit ackTimeUnit) { + final String toString = getClass().getSimpleName() + "-" + remoteInterface.getSimpleName() + "-proxy-" + generateRequestId(); InvocationHandler handler = new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + if (method.getName().equals("toString")) { + return toString; + } else if (method.getName().equals("equals")) { + return proxy == args[0]; + } else if (method.getName().equals("hashCode")) { + return toString.hashCode(); + } + String requestId = generateRequestId(); String requestQueueName = name + ":{" + remoteInterface.getName() + "}"; diff --git a/src/test/java/org/redisson/RedissonRemoteServiceTest.java b/src/test/java/org/redisson/RedissonRemoteServiceTest.java index 11ea1d5a5..0f8dcffc3 100644 --- a/src/test/java/org/redisson/RedissonRemoteServiceTest.java +++ b/src/test/java/org/redisson/RedissonRemoteServiceTest.java @@ -205,4 +205,33 @@ public class RedissonRemoteServiceTest extends BaseTest { r1.shutdown(); r2.shutdown(); } + + @Test + public void testProxyToStringEqualsAndHashCode() { + RedissonClient client = Redisson.create(); + try { + RemoteInterface service = client.getRemoteSerivce().get(RemoteInterface.class); + + try { + System.out.println(service.toString()); + } catch (Exception e) { + Assert.fail("calling toString on the client service proxy should not make a remote call"); + } + + try { + assertThat(service.hashCode() == service.hashCode()).isTrue(); + } catch (Exception e) { + Assert.fail("calling hashCode on the client service proxy should not make a remote call"); + } + + try { + assertThat(service.equals(service)).isTrue(); + } catch (Exception e) { + Assert.fail("calling equals on the client service proxy should not make a remote call"); + } + + } finally { + client.shutdown(); + } + } } From 0a31e3b28cd6b17f5998909d453f5107b536c846 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pierre-David=20Be=CC=81langer?= Date: Sat, 21 May 2016 19:21:01 -0400 Subject: [PATCH 06/15] Two tests to demonstrate that Redisson Remote Service does not work with serialization based codec --- .../redisson/RedissonRemoteServiceTest.java | 120 +++++++++++++++++- 1 file changed, 114 insertions(+), 6 deletions(-) diff --git a/src/test/java/org/redisson/RedissonRemoteServiceTest.java b/src/test/java/org/redisson/RedissonRemoteServiceTest.java index 0f8dcffc3..3452e5acc 100644 --- a/src/test/java/org/redisson/RedissonRemoteServiceTest.java +++ b/src/test/java/org/redisson/RedissonRemoteServiceTest.java @@ -1,18 +1,46 @@ package org.redisson; -import static org.assertj.core.api.Assertions.assertThat; +import org.junit.Assert; +import org.junit.Test; +import org.redisson.codec.FstCodec; +import org.redisson.codec.SerializationCodec; +import org.redisson.remote.RemoteServiceTimeoutException; import java.io.IOException; +import java.io.NotSerializableException; +import java.io.Serializable; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import org.junit.Assert; -import org.junit.Test; -import org.redisson.remote.RemoteServiceTimeoutException; +import static org.assertj.core.api.Assertions.assertThat; public class RedissonRemoteServiceTest extends BaseTest { + public class Pojo { + + private String stringField; + + public Pojo(String stringField) { + this.stringField = stringField; + } + + public String getStringField() { + return stringField; + } + + public void setStringField(String stringField) { + this.stringField = stringField; + } + } + + public class SerializablePojo extends Pojo implements Serializable { + + public SerializablePojo(String stringField) { + super(stringField); + } + } + public interface RemoteInterface { void voidMethod(String name, Long param); @@ -24,7 +52,11 @@ public class RedissonRemoteServiceTest extends BaseTest { void errorMethodWithCause(); void timeoutMethod() throws InterruptedException; - + + Pojo doSomethingWithPojo(Pojo pojo); + + SerializablePojo doSomethingWithSerializablePojo(SerializablePojo pojo); + } public class RemoteImpl implements RemoteInterface { @@ -58,7 +90,15 @@ public class RedissonRemoteServiceTest extends BaseTest { Thread.sleep(2000); } - + @Override + public Pojo doSomethingWithPojo(Pojo pojo) { + return pojo; + } + + @Override + public SerializablePojo doSomethingWithSerializablePojo(SerializablePojo pojo) { + return pojo; + } } @Test @@ -234,4 +274,72 @@ public class RedissonRemoteServiceTest extends BaseTest { client.shutdown(); } } + + @Test + public void testInvocationWithFstCodec() { + RedissonClient server = Redisson.create(createConfig().setCodec(new FstCodec())); + RedissonClient client = Redisson.create(createConfig().setCodec(new FstCodec())); + try { + server.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl()); + + RemoteInterface service = client.getRemoteSerivce().get(RemoteInterface.class); + + try { + assertThat(service.resultMethod(21L)).isEqualTo(42L); + } catch (Exception e) { + Assert.fail("Should be compatible with FstCodec"); + } + + try { + assertThat(service.doSomethingWithPojo(new SerializablePojo("test")).getStringField()).isEqualTo("test"); + } catch (Exception e) { + Assert.fail("Should be compatible with FstCodec"); + } + + try { + assertThat(service.doSomethingWithPojo(new Pojo("test")).getStringField()).isEqualTo("test"); + Assert.fail("FstCodec should not be able to serialize a not serializable class"); + } catch (Exception e) { + assertThat(e).isInstanceOf(RuntimeException.class); + assertThat(e.getMessage()).contains("Pojo does not implement Serializable"); + } + } finally { + client.shutdown(); + server.shutdown(); + } + } + + @Test + public void testInvocationWithSerializationCodec() { + RedissonClient server = Redisson.create(createConfig().setCodec(new SerializationCodec())); + RedissonClient client = Redisson.create(createConfig().setCodec(new SerializationCodec())); + try { + server.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl()); + + RemoteInterface service = client.getRemoteSerivce().get(RemoteInterface.class); + + try { + assertThat(service.resultMethod(21L)).isEqualTo(42L); + } catch (Exception e) { + Assert.fail("Should be compatible with SerializationCodec"); + } + + try { + assertThat(service.doSomethingWithPojo(new SerializablePojo("test")).getStringField()).isEqualTo("test"); + } catch (Exception e) { + Assert.fail("Should be compatible with SerializationCodec"); + } + + try { + assertThat(service.doSomethingWithPojo(new Pojo("test")).getStringField()).isEqualTo("test"); + Assert.fail("SerializationCodec should not be able to serialize a not serializable class"); + } catch (Exception e) { + assertThat(e).isInstanceOf(NotSerializableException.class); + assertThat(e.getMessage()).contains("Pojo"); + } + } finally { + client.shutdown(); + server.shutdown(); + } + } } From 54ee22fc88445913bca5fac1e50c7a1b3c25a6fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pierre-David=20Be=CC=81langer?= Date: Sun, 22 May 2016 09:50:27 -0400 Subject: [PATCH 07/15] make remote service request/response related classes implements Serializable to support serialization based codec --- .../remote/RRemoteServiceResponse.java | 4 +- .../org/redisson/remote/RemoteServiceAck.java | 4 +- .../redisson/remote/RemoteServiceRequest.java | 3 +- .../remote/RemoteServiceResponse.java | 4 +- .../redisson/RedissonRemoteServiceTest.java | 38 ++++++++++++++----- 5 files changed, 40 insertions(+), 13 deletions(-) diff --git a/src/main/java/org/redisson/remote/RRemoteServiceResponse.java b/src/main/java/org/redisson/remote/RRemoteServiceResponse.java index 67305dd00..57d20b670 100644 --- a/src/main/java/org/redisson/remote/RRemoteServiceResponse.java +++ b/src/main/java/org/redisson/remote/RRemoteServiceResponse.java @@ -15,6 +15,8 @@ */ package org.redisson.remote; -public interface RRemoteServiceResponse { +import java.io.Serializable; + +public interface RRemoteServiceResponse extends Serializable { } diff --git a/src/main/java/org/redisson/remote/RemoteServiceAck.java b/src/main/java/org/redisson/remote/RemoteServiceAck.java index 2d34c4804..ccfa1680f 100644 --- a/src/main/java/org/redisson/remote/RemoteServiceAck.java +++ b/src/main/java/org/redisson/remote/RemoteServiceAck.java @@ -15,12 +15,14 @@ */ package org.redisson.remote; +import java.io.Serializable; + /** * Worker sends this message when it has received a {@link RemoteServiceRequest}. * * @author Nikita Koksharov * */ -public class RemoteServiceAck implements RRemoteServiceResponse { +public class RemoteServiceAck implements RRemoteServiceResponse, Serializable { } diff --git a/src/main/java/org/redisson/remote/RemoteServiceRequest.java b/src/main/java/org/redisson/remote/RemoteServiceRequest.java index 3980ac4f0..3b65c85fb 100644 --- a/src/main/java/org/redisson/remote/RemoteServiceRequest.java +++ b/src/main/java/org/redisson/remote/RemoteServiceRequest.java @@ -15,9 +15,10 @@ */ package org.redisson.remote; +import java.io.Serializable; import java.util.Arrays; -public class RemoteServiceRequest { +public class RemoteServiceRequest implements Serializable { private String requestId; private String methodName; diff --git a/src/main/java/org/redisson/remote/RemoteServiceResponse.java b/src/main/java/org/redisson/remote/RemoteServiceResponse.java index 55d074870..d6e03ae61 100644 --- a/src/main/java/org/redisson/remote/RemoteServiceResponse.java +++ b/src/main/java/org/redisson/remote/RemoteServiceResponse.java @@ -15,7 +15,9 @@ */ package org.redisson.remote; -public class RemoteServiceResponse implements RRemoteServiceResponse { +import java.io.Serializable; + +public class RemoteServiceResponse implements RRemoteServiceResponse, Serializable { private Object result; private Throwable error; diff --git a/src/test/java/org/redisson/RedissonRemoteServiceTest.java b/src/test/java/org/redisson/RedissonRemoteServiceTest.java index 3452e5acc..1b6d6aa3d 100644 --- a/src/test/java/org/redisson/RedissonRemoteServiceTest.java +++ b/src/test/java/org/redisson/RedissonRemoteServiceTest.java @@ -1,5 +1,6 @@ package org.redisson; +import io.netty.handler.codec.EncoderException; import org.junit.Assert; import org.junit.Test; import org.redisson.codec.FstCodec; @@ -17,10 +18,13 @@ import static org.assertj.core.api.Assertions.assertThat; public class RedissonRemoteServiceTest extends BaseTest { - public class Pojo { + public static class Pojo { private String stringField; + public Pojo() { + } + public Pojo(String stringField) { this.stringField = stringField; } @@ -34,10 +38,23 @@ public class RedissonRemoteServiceTest extends BaseTest { } } - public class SerializablePojo extends Pojo implements Serializable { + public static class SerializablePojo implements Serializable { + + private String stringField; + + public SerializablePojo() { + } public SerializablePojo(String stringField) { - super(stringField); + this.stringField = stringField; + } + + public String getStringField() { + return stringField; + } + + public void setStringField(String stringField) { + this.stringField = stringField; } } @@ -291,7 +308,7 @@ public class RedissonRemoteServiceTest extends BaseTest { } try { - assertThat(service.doSomethingWithPojo(new SerializablePojo("test")).getStringField()).isEqualTo("test"); + assertThat(service.doSomethingWithSerializablePojo(new SerializablePojo("test")).getStringField()).isEqualTo("test"); } catch (Exception e) { Assert.fail("Should be compatible with FstCodec"); } @@ -300,8 +317,8 @@ public class RedissonRemoteServiceTest extends BaseTest { assertThat(service.doSomethingWithPojo(new Pojo("test")).getStringField()).isEqualTo("test"); Assert.fail("FstCodec should not be able to serialize a not serializable class"); } catch (Exception e) { - assertThat(e).isInstanceOf(RuntimeException.class); - assertThat(e.getMessage()).contains("Pojo does not implement Serializable"); + assertThat(e.getCause()).isInstanceOf(EncoderException.class); + assertThat(e.getCause().getMessage()).contains("Pojo does not implement Serializable"); } } finally { client.shutdown(); @@ -325,8 +342,9 @@ public class RedissonRemoteServiceTest extends BaseTest { } try { - assertThat(service.doSomethingWithPojo(new SerializablePojo("test")).getStringField()).isEqualTo("test"); + assertThat(service.doSomethingWithSerializablePojo(new SerializablePojo("test")).getStringField()).isEqualTo("test"); } catch (Exception e) { + e.printStackTrace(); Assert.fail("Should be compatible with SerializationCodec"); } @@ -334,8 +352,10 @@ public class RedissonRemoteServiceTest extends BaseTest { assertThat(service.doSomethingWithPojo(new Pojo("test")).getStringField()).isEqualTo("test"); Assert.fail("SerializationCodec should not be able to serialize a not serializable class"); } catch (Exception e) { - assertThat(e).isInstanceOf(NotSerializableException.class); - assertThat(e.getMessage()).contains("Pojo"); + e.printStackTrace(); + assertThat(e.getCause()).isInstanceOf(EncoderException.class); + assertThat(e.getCause().getCause()).isInstanceOf(NotSerializableException.class); + assertThat(e.getCause().getCause().getMessage()).contains("Pojo"); } } finally { client.shutdown(); From 7b85da1f21c18868fa796cc5b30ab4618056dc18 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Sun, 22 May 2016 17:07:36 +0300 Subject: [PATCH 08/15] Update README.md --- README.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/README.md b/README.md index b3bbf8c41..da1e0137e 100644 --- a/README.md +++ b/README.md @@ -3,8 +3,6 @@ Redis based In-Memory Data Grid for Java. Redisson. [![Maven Central](https://img.shields.io/maven-central/v/org.redisson/redisson.svg?style=flat-square)](https://maven-badges.herokuapp.com/maven-central/org.redisson/redisson/) -###Try [Redisson PRO](http://redisson.pro) edition - Use familiar Java data structures with power of [Redis](http://redis.io). Based on high-performance async and lock-free Java Redis client and [Netty 4](http://netty.io) framework. @@ -20,6 +18,7 @@ Licensed under the Apache License 2.0. Welcome to support chat - [![Join the chat at https://gitter.im/mrniko/redisson](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/mrniko/redisson?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) ##Please take part in [Redisson survey](https://ru.surveymonkey.com/r/LP7RG8Q) +####Try [Redisson PRO](http://redisson.pro) edition Features ================================ From b2e58d6a25327b1f7c17a12ef1ebe9c3d5b90fdf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pierre-David=20Be=CC=81langer?= Date: Thu, 19 May 2016 22:48:55 -0400 Subject: [PATCH 09/15] proof of concept for unacknowledged and/or fire-and-forget calls in RRemoteService --- .../org/redisson/RedissonRemoteService.java | 134 ++++++++++++------ .../org/redisson/core/RRemoteService.java | 10 +- .../redisson/RedissonRemoteServiceTest.java | 80 ++++++++++- 3 files changed, 178 insertions(+), 46 deletions(-) diff --git a/src/main/java/org/redisson/RedissonRemoteService.java b/src/main/java/org/redisson/RedissonRemoteService.java index 0f1df58bd..8d94c05be 100644 --- a/src/main/java/org/redisson/RedissonRemoteService.java +++ b/src/main/java/org/redisson/RedissonRemoteService.java @@ -108,9 +108,10 @@ public class RedissonRemoteService implements RRemoteService { // do not subscribe now, see https://github.com/mrniko/redisson/issues/493 // subscribe(remoteInterface, requestQueue); - + final RemoteServiceRequest request = future.getNow(); - if (System.currentTimeMillis() - request.getDate() > request.getAckTimeout()) { + // negative ackTimeout means unacknowledged call, do not check the ack + if (request.getAckTimeout() >= 0 && System.currentTimeMillis() - request.getDate() > request.getAckTimeout()) { log.debug("request: {} has been skipped due to ackTimeout"); // re-subscribe after a skipped ackTimeout subscribe(remoteInterface, requestQueue); @@ -119,24 +120,29 @@ public class RedissonRemoteService implements RRemoteService { final RemoteServiceMethod method = beans.get(new RemoteServiceKey(remoteInterface, request.getMethodName())); final String responseName = name + ":{" + remoteInterface.getName() + "}:" + request.getRequestId(); - - Future> ackClientsFuture = send(request.getAckTimeout(), responseName, new RemoteServiceAck()); - ackClientsFuture.addListener(new FutureListener>() { - @Override - public void operationComplete(Future> future) throws Exception { - if (!future.isSuccess()) { - log.error("Can't send ack for request: " + request, future.cause()); - if (future.cause() instanceof RedissonShutdownException) { + + // negative ackTimeout means unacknowledged call, do not send the ack + if (request.getAckTimeout() >= 0) { + Future> ackClientsFuture = send(request.getAckTimeout(), responseName, new RemoteServiceAck()); + ackClientsFuture.addListener(new FutureListener>() { + @Override + public void operationComplete(Future> future) throws Exception { + if (!future.isSuccess()) { + log.error("Can't send ack for request: " + request, future.cause()); + if (future.cause() instanceof RedissonShutdownException) { + return; + } + // re-subscribe after a failed send (ack) + subscribe(remoteInterface, requestQueue); return; } - // re-subscribe after a failed send (ack) - subscribe(remoteInterface, requestQueue); - return; - } - invokeMethod(remoteInterface, requestQueue, request, method, responseName); - } - }); + invokeMethod(remoteInterface, requestQueue, request, method, responseName); + } + }); + } else { + invokeMethod(remoteInterface, requestQueue, request, method, responseName); + } } }); @@ -153,21 +159,27 @@ public class RedissonRemoteService implements RRemoteService { responseHolder.set(response); log.error("Can't execute: " + request, e); } - - Future> clientsFuture = send(request.getResponseTimeout(), responseName, responseHolder.get()); - clientsFuture.addListener(new FutureListener>() { - @Override - public void operationComplete(Future> future) throws Exception { - if (!future.isSuccess()) { - log.error("Can't send response: " + responseHolder.get() + " for request: " + request, future.cause()); - if (future.cause() instanceof RedissonShutdownException) { - return; + + // negative responseTimeout means fire-and-forget call, do not send the response + if (request.getResponseTimeout() >= 0) { + Future> clientsFuture = send(request.getResponseTimeout(), responseName, responseHolder.get()); + clientsFuture.addListener(new FutureListener>() { + @Override + public void operationComplete(Future> future) throws Exception { + if (!future.isSuccess()) { + log.error("Can't send response: " + responseHolder.get() + " for request: " + request, future.cause()); + if (future.cause() instanceof RedissonShutdownException) { + return; + } } + // re-subscribe anyways (fail or success) after the send (response) + subscribe(remoteInterface, requestQueue); } - // re-subscribe anyways (fail or success) after the send (response) - subscribe(remoteInterface, requestQueue); - } - }); + }); + } else { + // re-subscribe anyways after the method invocation + subscribe(remoteInterface, requestQueue); + } } @Override @@ -204,20 +216,28 @@ public class RedissonRemoteService implements RRemoteService { String responseName = name + ":{" + remoteInterface.getName() + "}:" + requestId; RBlockingQueue responseQueue = redisson.getBlockingQueue(responseName); - - RemoteServiceAck ack = (RemoteServiceAck) responseQueue.poll(ackTimeout, ackTimeUnit); - if (ack == null) { - throw new RemoteServiceAckTimeoutException("No ACK response after " + ackTimeUnit.toMillis(ackTimeout) + "ms for request: " + request); - } - - RemoteServiceResponse response = (RemoteServiceResponse) responseQueue.poll(executionTimeout, executionTimeUnit); - if (response == null) { - throw new RemoteServiceTimeoutException("No response after " + executionTimeUnit.toMillis(executionTimeout) + "ms for request: " + request); + + // negative ackTimeout means unacknowledged call, do not poll for the ack + if (ackTimeout >= 0) { + RemoteServiceAck ack = (RemoteServiceAck) responseQueue.poll(ackTimeout, ackTimeUnit); + if (ack == null) { + throw new RemoteServiceAckTimeoutException("No ACK response after " + ackTimeUnit.toMillis(ackTimeout) + "ms for request: " + request); + } } - if (response.getError() != null) { - throw response.getError(); + + // negative executionTimeout means fire-and-forget call, do not poll for the response + if (executionTimeout >= 0) { + RemoteServiceResponse response = (RemoteServiceResponse) responseQueue.poll(executionTimeout, executionTimeUnit); + if (response == null) { + throw new RemoteServiceTimeoutException("No response after " + executionTimeUnit.toMillis(executionTimeout) + "ms for request: " + request); + } + if (response.getError() != null) { + throw response.getError(); + } + return response.getResult(); } - return response.getResult(); + + return getDefaultValue(method.getReturnType()); } }; return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[] {remoteInterface}, handler); @@ -237,5 +257,33 @@ public class RedissonRemoteService implements RRemoteService { queue.expireAsync(timeout, TimeUnit.MILLISECONDS); return batch.executeAsync(); } - + + /** + * Horrible hack to get the default value for a Class + * + * @param type the Class + * @return the default value as + */ + private static Object getDefaultValue(Class type) { + if (!type.isPrimitive() || type.equals(void.class)) { + return null; + } else if (type.equals(boolean.class)) { + return false; + } else if (type.equals(byte.class)) { + return (byte) 0; + } else if (type.equals(char.class)) { + return (char) 0; + } else if (type.equals(short.class)) { + return (short) 0; + } else if (type.equals(int.class)) { + return (int) 0; + } else if (type.equals(long.class)) { + return (long) 0; + } else if (type.equals(float.class)) { + return (float) 0; + } else if (type.equals(double.class)) { + return (double) 0; + } + throw new IllegalArgumentException("Class " + type + " not supported"); + } } diff --git a/src/main/java/org/redisson/core/RRemoteService.java b/src/main/java/org/redisson/core/RRemoteService.java index f5449b664..386b0e6c2 100644 --- a/src/main/java/org/redisson/core/RRemoteService.java +++ b/src/main/java/org/redisson/core/RRemoteService.java @@ -91,7 +91,9 @@ public interface RRemoteService { * with specified invocation timeout. *

* Ack timeout = 1000 ms by default - * + *

+ * A negative executionTimeout will make fire-and-forget calls + * * @param remoteInterface * @param executionTimeout - invocation timeout * @param executionTimeUnit @@ -102,7 +104,11 @@ public interface RRemoteService { /** * Get remote service object for remote invocations * with specified invocation and ack timeouts - * + *

+ * A negative executionTimeout will make fire-and-forget calls + *

+ * A negative ackTimeout will make unacknowledged calls + * * @param remoteInterface * @param executionTimeout - invocation timeout * @param executionTimeUnit diff --git a/src/test/java/org/redisson/RedissonRemoteServiceTest.java b/src/test/java/org/redisson/RedissonRemoteServiceTest.java index 1b6d6aa3d..886771300 100644 --- a/src/test/java/org/redisson/RedissonRemoteServiceTest.java +++ b/src/test/java/org/redisson/RedissonRemoteServiceTest.java @@ -61,6 +61,8 @@ public class RedissonRemoteServiceTest extends BaseTest { public interface RemoteInterface { void voidMethod(String name, Long param); + + int primitiveMethod(); Long resultMethod(Long value); @@ -82,7 +84,12 @@ public class RedissonRemoteServiceTest extends BaseTest { public void voidMethod(String name, Long param) { System.out.println(name + " " + param); } - + + @Override + public int primitiveMethod() { + return 42; + } + @Override public Long resultMethod(Long value) { return value*2; @@ -362,4 +369,75 @@ public class RedissonRemoteServiceTest extends BaseTest { server.shutdown(); } } + + @Test + public void testUnacknowledgedInvocations() throws InterruptedException { + RedissonClient r1 = Redisson.create(); + r1.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl()); + + RedissonClient r2 = Redisson.create(); + RemoteInterface ri = r2.getRemoteSerivce().get(RemoteInterface.class, 30, TimeUnit.SECONDS, -1, TimeUnit.SECONDS); + + ri.voidMethod("someName", 100L); + assertThat(ri.resultMethod(100L)).isEqualTo(200); + + assertThat(ri.primitiveMethod()).isEqualTo(42); + + try { + ri.errorMethod(); + Assert.fail(); + } catch (IOException e) { + assertThat(e.getMessage()).isEqualTo("Checking error throw"); + } + + try { + ri.errorMethodWithCause(); + Assert.fail(); + } catch (Exception e) { + assertThat(e.getCause()).isInstanceOf(ArithmeticException.class); + assertThat(e.getCause().getMessage()).isEqualTo("/ by zero"); + } + + long time = System.currentTimeMillis(); + ri.timeoutMethod(); + time = System.currentTimeMillis() - time; + assertThat(time).describedAs("unacknowledged should still wait for the server to return a response").isGreaterThanOrEqualTo(2000); + + r1.shutdown(); + r2.shutdown(); + } + + @Test + public void testFireAndForgetInvocations() throws InterruptedException { + RedissonClient r1 = Redisson.create(); + r1.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl()); + + RedissonClient r2 = Redisson.create(); + RemoteInterface ri = r2.getRemoteSerivce().get(RemoteInterface.class, -1, TimeUnit.SECONDS); + + ri.voidMethod("someName", 100L); + assertThat(ri.resultMethod(100L)).isNull(); + + assertThat(ri.primitiveMethod()).isEqualTo(0); + + try { + ri.errorMethod(); + } catch (IOException e) { + Assert.fail("fire-and-forget should not throw"); + } + + try { + ri.errorMethodWithCause(); + } catch (Exception e) { + Assert.fail("fire-and-forget should not throw"); + } + + long time = System.currentTimeMillis(); + ri.timeoutMethod(); + time = System.currentTimeMillis() - time; + assertThat(time).describedAs("fire-and-forget should not wait for the server to return a response").isLessThan(2000); + + r1.shutdown(); + r2.shutdown(); + } } From dd01fdbc141f59ef29939589adcc0781da8c1436 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pierre-David=20Be=CC=81langer?= Date: Fri, 20 May 2016 22:47:02 -0400 Subject: [PATCH 10/15] Introduce a new option class to control the ack/execution timeouts instead of relying on negative timeout values --- .../org/redisson/RedissonRemoteService.java | 116 +++++------- .../org/redisson/core/RRemoteService.java | 51 ++++-- .../core/RemoteInvocationOptions.java | 127 ++++++++++++++ .../redisson/RedissonRemoteServiceTest.java | 166 +++++++++++++----- 4 files changed, 332 insertions(+), 128 deletions(-) create mode 100644 src/main/java/org/redisson/core/RemoteInvocationOptions.java diff --git a/src/main/java/org/redisson/RedissonRemoteService.java b/src/main/java/org/redisson/RedissonRemoteService.java index 8d94c05be..5e926de04 100644 --- a/src/main/java/org/redisson/RedissonRemoteService.java +++ b/src/main/java/org/redisson/RedissonRemoteService.java @@ -15,6 +15,16 @@ */ package org.redisson; +import io.netty.buffer.ByteBufUtil; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.internal.PlatformDependent; +import io.netty.util.internal.ThreadLocalRandom; +import org.redisson.core.*; +import org.redisson.remote.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; @@ -23,27 +33,6 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import org.redisson.core.RBatch; -import org.redisson.core.RBlockingQueue; -import org.redisson.core.RBlockingQueueAsync; -import org.redisson.core.RRemoteService; -import org.redisson.remote.RRemoteServiceResponse; -import org.redisson.remote.RemoteServiceAck; -import org.redisson.remote.RemoteServiceAckTimeoutException; -import org.redisson.remote.RemoteServiceKey; -import org.redisson.remote.RemoteServiceMethod; -import org.redisson.remote.RemoteServiceRequest; -import org.redisson.remote.RemoteServiceResponse; -import org.redisson.remote.RemoteServiceTimeoutException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import io.netty.buffer.ByteBufUtil; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; -import io.netty.util.internal.PlatformDependent; -import io.netty.util.internal.ThreadLocalRandom; - /** * * @author Nikita Koksharov @@ -181,19 +170,29 @@ public class RedissonRemoteService implements RRemoteService { subscribe(remoteInterface, requestQueue); } } - + @Override public T get(Class remoteInterface) { - return get(remoteInterface, 30, TimeUnit.SECONDS); + return get(remoteInterface, RemoteInvocationOptions.defaults()); } @Override public T get(final Class remoteInterface, final long executionTimeout, final TimeUnit executionTimeUnit) { - return get(remoteInterface, executionTimeout, executionTimeUnit, 1, TimeUnit.SECONDS); + return get(remoteInterface, RemoteInvocationOptions.defaults() + .expectResultWithin(executionTimeout, executionTimeUnit)); } - - public T get(final Class remoteInterface, final long executionTimeout, final TimeUnit executionTimeUnit, - final long ackTimeout, final TimeUnit ackTimeUnit) { + + public T get(final Class remoteInterface, final long executionTimeout, final TimeUnit executionTimeUnit, + final long ackTimeout, final TimeUnit ackTimeUnit) { + return get(remoteInterface, RemoteInvocationOptions.defaults() + .expectAckWithin(ackTimeout, ackTimeUnit) + .expectResultWithin(executionTimeout, executionTimeUnit)); + } + + public T get(final Class remoteInterface, RemoteInvocationOptions options) { + // local copy of the options, to prevent mutation + final long ackTimeoutInMillis = options.isAckExpected() ? options.getAckTimeoutInMillis() : -1; + final long executionTimeoutInMillis = options.isResultExpected() ? options.getExecutionTimeoutInMillis() : -1; final String toString = getClass().getSimpleName() + "-" + remoteInterface.getSimpleName() + "-proxy-" + generateRequestId(); InvocationHandler handler = new InvocationHandler() { @Override @@ -206,30 +205,36 @@ public class RedissonRemoteService implements RRemoteService { return toString.hashCode(); } + if (executionTimeoutInMillis < 0 && !(method.getReturnType().equals(Void.class) || method.getReturnType().equals(Void.TYPE))) + throw new IllegalArgumentException("The noResult option only supports void return value"); + String requestId = generateRequestId(); - + String requestQueueName = name + ":{" + remoteInterface.getName() + "}"; RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName); - RemoteServiceRequest request = new RemoteServiceRequest(requestId, method.getName(), args, - ackTimeUnit.toMillis(ackTimeout), executionTimeUnit.toMillis(executionTimeout), System.currentTimeMillis()); + RemoteServiceRequest request = new RemoteServiceRequest(requestId, method.getName(), args, + ackTimeoutInMillis, executionTimeoutInMillis, System.currentTimeMillis()); requestQueue.add(request); - - String responseName = name + ":{" + remoteInterface.getName() + "}:" + requestId; - RBlockingQueue responseQueue = redisson.getBlockingQueue(responseName); + + RBlockingQueue responseQueue = null; + if (ackTimeoutInMillis >= 0 || executionTimeoutInMillis >= 0) { + String responseName = name + ":{" + remoteInterface.getName() + "}:" + requestId; + responseQueue = redisson.getBlockingQueue(responseName); + } // negative ackTimeout means unacknowledged call, do not poll for the ack - if (ackTimeout >= 0) { - RemoteServiceAck ack = (RemoteServiceAck) responseQueue.poll(ackTimeout, ackTimeUnit); + if (ackTimeoutInMillis >= 0) { + RemoteServiceAck ack = (RemoteServiceAck) responseQueue.poll(ackTimeoutInMillis, TimeUnit.MILLISECONDS); if (ack == null) { - throw new RemoteServiceAckTimeoutException("No ACK response after " + ackTimeUnit.toMillis(ackTimeout) + "ms for request: " + request); + throw new RemoteServiceAckTimeoutException("No ACK response after " + ackTimeoutInMillis + "ms for request: " + request); } } // negative executionTimeout means fire-and-forget call, do not poll for the response - if (executionTimeout >= 0) { - RemoteServiceResponse response = (RemoteServiceResponse) responseQueue.poll(executionTimeout, executionTimeUnit); + if (executionTimeoutInMillis >= 0) { + RemoteServiceResponse response = (RemoteServiceResponse) responseQueue.poll(executionTimeoutInMillis, TimeUnit.MILLISECONDS); if (response == null) { - throw new RemoteServiceTimeoutException("No response after " + executionTimeUnit.toMillis(executionTimeout) + "ms for request: " + request); + throw new RemoteServiceTimeoutException("No response after " + executionTimeoutInMillis + "ms for request: " + request); } if (response.getError() != null) { throw response.getError(); @@ -237,10 +242,10 @@ public class RedissonRemoteService implements RRemoteService { return response.getResult(); } - return getDefaultValue(method.getReturnType()); + return null; } }; - return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[] {remoteInterface}, handler); + return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[]{remoteInterface}, handler); } private String generateRequestId() { @@ -257,33 +262,4 @@ public class RedissonRemoteService implements RRemoteService { queue.expireAsync(timeout, TimeUnit.MILLISECONDS); return batch.executeAsync(); } - - /** - * Horrible hack to get the default value for a Class - * - * @param type the Class - * @return the default value as - */ - private static Object getDefaultValue(Class type) { - if (!type.isPrimitive() || type.equals(void.class)) { - return null; - } else if (type.equals(boolean.class)) { - return false; - } else if (type.equals(byte.class)) { - return (byte) 0; - } else if (type.equals(char.class)) { - return (char) 0; - } else if (type.equals(short.class)) { - return (short) 0; - } else if (type.equals(int.class)) { - return (int) 0; - } else if (type.equals(long.class)) { - return (long) 0; - } else if (type.equals(float.class)) { - return (float) 0; - } else if (type.equals(double.class)) { - return (double) 0; - } - throw new IllegalArgumentException("Class " + type + " not supported"); - } } diff --git a/src/main/java/org/redisson/core/RRemoteService.java b/src/main/java/org/redisson/core/RRemoteService.java index 386b0e6c2..1980cbfff 100644 --- a/src/main/java/org/redisson/core/RRemoteService.java +++ b/src/main/java/org/redisson/core/RRemoteService.java @@ -77,22 +77,31 @@ public interface RRemoteService { /** * Get remote service object for remote invocations. *

- * Ack timeout = 1000 ms by default - *

- * Execution timeout = 30 sec by default - * + * This method is a shortcut for + *

+     *     get(remoteInterface, RemoteInvocationOptions.defaults())
+     * 
+ * + * @see RemoteInvocationOptions#defaults() + * @see #get(Class, RemoteInvocationOptions) + * * @param remoteInterface * @return */ T get(Class remoteInterface); - + /** * Get remote service object for remote invocations * with specified invocation timeout. - *

- * Ack timeout = 1000 ms by default *

- * A negative executionTimeout will make fire-and-forget calls + * This method is a shortcut for + *

+     *     get(remoteInterface, RemoteInvocationOptions.defaults()
+     *      .expectResultWithin(executionTimeout, executionTimeUnit))
+     * 
+ * + * @see RemoteInvocationOptions#defaults() + * @see #get(Class, RemoteInvocationOptions) * * @param remoteInterface * @param executionTimeout - invocation timeout @@ -105,9 +114,15 @@ public interface RRemoteService { * Get remote service object for remote invocations * with specified invocation and ack timeouts *

- * A negative executionTimeout will make fire-and-forget calls - *

- * A negative ackTimeout will make unacknowledged calls + * This method is a shortcut for + *

+     *     get(remoteInterface, RemoteInvocationOptions.defaults()
+     *      .expectAckWithin(ackTimeout, ackTimeUnit)
+     *      .expectResultWithin(executionTimeout, executionTimeUnit))
+     * 
+ * + * @see RemoteInvocationOptions + * @see #get(Class, RemoteInvocationOptions) * * @param remoteInterface * @param executionTimeout - invocation timeout @@ -117,5 +132,17 @@ public interface RRemoteService { * @return */ T get(Class remoteInterface, long executionTimeout, TimeUnit executionTimeUnit, long ackTimeout, TimeUnit ackTimeUnit); - + + /** + * Get remote service object for remote invocations + * with the specified options + *

+ * Note that when using the noResult() option, + * it is expected that the invoked method returns void, + * or else IllegalArgumentException will be thrown. + * + * @see RemoteInvocationOptions + */ + T get(Class remoteInterface, RemoteInvocationOptions options); + } diff --git a/src/main/java/org/redisson/core/RemoteInvocationOptions.java b/src/main/java/org/redisson/core/RemoteInvocationOptions.java new file mode 100644 index 000000000..0d386f6b1 --- /dev/null +++ b/src/main/java/org/redisson/core/RemoteInvocationOptions.java @@ -0,0 +1,127 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +import java.util.concurrent.TimeUnit; + +/** + * RRemoteService invocation options. + * + * Used to tune how RRemoteService will behave + * in regard to the remote invocations acknowledgement + * and execution timeout. + *

+ * Examples: + *

+ *     // 1 second ack timeout and 30 seconds execution timeout
+ *     RemoteInvocationOptions options =
+ *          RemoteInvocationOptions.defaults();
+ *
+ *     // no ack but 30 seconds execution timeout
+ *     RemoteInvocationOptions options =
+ *          RemoteInvocationOptions.defaults()
+ *              .noAck();
+ *
+ *     // 1 second ack timeout then forget the result
+ *     RemoteInvocationOptions options =
+ *          RemoteInvocationOptions.defaults()
+ *              .noResult();
+ *
+ *     // 1 minute ack timeout then forget about the result
+ *     RemoteInvocationOptions options =
+ *          RemoteInvocationOptions.defaults()
+ *              .expectAckWithin(1, TimeUnit.MINUTES)
+ *              .noResult();
+ *
+ *     // no ack and forget about the result (fire and forget)
+ *     RemoteInvocationOptions options =
+ *          RemoteInvocationOptions.defaults()
+ *              .noAck()
+ *              .noResult();
+ * 
+ * + * @see RRemoteService#get(Class, RemoteInvocationOptions) + */ +public class RemoteInvocationOptions { + + private Long ackTimeoutInMillis; + private Long executionTimeoutInMillis; + + private RemoteInvocationOptions(Long ackTimeoutInMillis, Long executionTimeoutInMillis) { + this.ackTimeoutInMillis = ackTimeoutInMillis; + this.executionTimeoutInMillis = executionTimeoutInMillis; + } + + /** + * Creates a new instance of RemoteInvocationOptions with opinionated defaults. + *

+ * This is equivalent to: + *

+     *     RemoteInvocationOptions.defaults()
+     *      .expectAckWithin(1, TimeUnit.SECONDS)
+     *      .expectResultWithin(30, TimeUnit.SECONDS)
+     * 
+ */ + public static RemoteInvocationOptions defaults() { + return new RemoteInvocationOptions(TimeUnit.SECONDS.toMillis(1), TimeUnit.SECONDS.toMillis(30)); + } + + public Long getAckTimeoutInMillis() { + return ackTimeoutInMillis; + } + + public Long getExecutionTimeoutInMillis() { + return executionTimeoutInMillis; + } + + public boolean isAckExpected() { + return ackTimeoutInMillis != null; + } + + public boolean isResultExpected() { + return executionTimeoutInMillis != null; + } + + public RemoteInvocationOptions expectAckWithin(long ackTimeoutInMillis) { + this.ackTimeoutInMillis = ackTimeoutInMillis; + return this; + } + + public RemoteInvocationOptions expectAckWithin(long ackTimeout, TimeUnit timeUnit) { + this.ackTimeoutInMillis = timeUnit.toMillis(ackTimeout); + return this; + } + + public RemoteInvocationOptions noAck() { + ackTimeoutInMillis = null; + return this; + } + + public RemoteInvocationOptions expectResultWithin(long executionTimeoutInMillis) { + this.executionTimeoutInMillis = executionTimeoutInMillis; + return this; + } + + public RemoteInvocationOptions expectResultWithin(long executionTimeout, TimeUnit timeUnit) { + this.executionTimeoutInMillis = timeUnit.toMillis(executionTimeout); + return this; + } + + public RemoteInvocationOptions noResult() { + executionTimeoutInMillis = null; + return this; + } +} diff --git a/src/test/java/org/redisson/RedissonRemoteServiceTest.java b/src/test/java/org/redisson/RedissonRemoteServiceTest.java index 886771300..26a20d902 100644 --- a/src/test/java/org/redisson/RedissonRemoteServiceTest.java +++ b/src/test/java/org/redisson/RedissonRemoteServiceTest.java @@ -5,6 +5,8 @@ import org.junit.Assert; import org.junit.Test; import org.redisson.codec.FstCodec; import org.redisson.codec.SerializationCodec; +import org.redisson.core.RemoteInvocationOptions; +import org.redisson.remote.RemoteServiceAckTimeoutException; import org.redisson.remote.RemoteServiceTimeoutException; import java.io.IOException; @@ -62,8 +64,6 @@ public class RedissonRemoteServiceTest extends BaseTest { void voidMethod(String name, Long param); - int primitiveMethod(); - Long resultMethod(Long value); void errorMethod() throws IOException; @@ -85,11 +85,6 @@ public class RedissonRemoteServiceTest extends BaseTest { System.out.println(name + " " + param); } - @Override - public int primitiveMethod() { - return 42; - } - @Override public Long resultMethod(Long value) { return value*2; @@ -255,19 +250,33 @@ public class RedissonRemoteServiceTest extends BaseTest { @Test public void testInvocationWithServiceName() { - String name = "MyServiceName"; + RedissonClient server = Redisson.create(); + RedissonClient client = Redisson.create(); - RedissonClient r1 = Redisson.create(); - r1.getRemoteSerivce(name).register(RemoteInterface.class, new RemoteImpl()); + server.getRemoteSerivce("MyServiceNamespace").register(RemoteInterface.class, new RemoteImpl()); - RedissonClient r2 = Redisson.create(); - RemoteInterface ri = r2.getRemoteSerivce(name).get(RemoteInterface.class); + RemoteInterface serviceRemoteInterface = client.getRemoteSerivce("MyServiceNamespace").get(RemoteInterface.class); + RemoteInterface otherServiceRemoteInterface = client.getRemoteSerivce("MyOtherServiceNamespace").get(RemoteInterface.class); + RemoteInterface defaultServiceRemoteInterface = client.getRemoteSerivce().get(RemoteInterface.class); - ri.voidMethod("someName", 100L); - assertThat(ri.resultMethod(100L)).isEqualTo(200); + assertThat(serviceRemoteInterface.resultMethod(21L)).isEqualTo(42L); - r1.shutdown(); - r2.shutdown(); + try { + otherServiceRemoteInterface.resultMethod(21L); + Assert.fail("Invoking a service in an unregistered custom services namespace should throw"); + } catch (Exception e) { + assertThat(e).isInstanceOf(RemoteServiceAckTimeoutException.class); + } + + try { + defaultServiceRemoteInterface.resultMethod(21L); + Assert.fail("Invoking a service in the unregistered default services namespace should throw"); + } catch (Exception e) { + assertThat(e).isInstanceOf(RemoteServiceAckTimeoutException.class); + } + + client.shutdown(); + server.shutdown(); } @Test @@ -371,73 +380,138 @@ public class RedissonRemoteServiceTest extends BaseTest { } @Test - public void testUnacknowledgedInvocations() throws InterruptedException { - RedissonClient r1 = Redisson.create(); - r1.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl()); + public void testNoAckWithResultInvocations() throws InterruptedException { + RedissonClient server = Redisson.create(); + RedissonClient client = Redisson.create(); - RedissonClient r2 = Redisson.create(); - RemoteInterface ri = r2.getRemoteSerivce().get(RemoteInterface.class, 30, TimeUnit.SECONDS, -1, TimeUnit.SECONDS); + server.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl()); - ri.voidMethod("someName", 100L); - assertThat(ri.resultMethod(100L)).isEqualTo(200); + // no ack but an execution timeout of 1 second + RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.SECONDS); + RemoteInterface service = client.getRemoteSerivce().get(RemoteInterface.class, options); - assertThat(ri.primitiveMethod()).isEqualTo(42); + service.voidMethod("noAck", 100L); + assertThat(service.resultMethod(21L)).isEqualTo(42); try { - ri.errorMethod(); + service.errorMethod(); Assert.fail(); } catch (IOException e) { assertThat(e.getMessage()).isEqualTo("Checking error throw"); } try { - ri.errorMethodWithCause(); + service.errorMethodWithCause(); Assert.fail(); } catch (Exception e) { assertThat(e.getCause()).isInstanceOf(ArithmeticException.class); assertThat(e.getCause().getMessage()).isEqualTo("/ by zero"); } + try { + service.timeoutMethod(); + Assert.fail("noAck option should still wait for the server to return a response and throw if the execution timeout is exceeded"); + } catch (Exception e) { + assertThat(e).isInstanceOf(RemoteServiceTimeoutException.class); + } + + client.shutdown(); + server.shutdown(); + } + + @Test + public void testAckWithoutResultInvocations() throws InterruptedException { + RedissonClient server = Redisson.create(); + RedissonClient client = Redisson.create(); + + server.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl()); + + // fire and forget with an ack timeout of 1 sec + RemoteInvocationOptions options = RemoteInvocationOptions.defaults().expectAckWithin(1, TimeUnit.SECONDS).noResult(); + RemoteInterface service = client.getRemoteSerivce().get(RemoteInterface.class, options); + + service.voidMethod("noResult", 100L); + + try { + service.resultMethod(100L); + Assert.fail(); + } catch (Exception e) { + assertThat(e).isInstanceOf(IllegalArgumentException.class); + } + + try { + service.errorMethod(); + } catch (IOException e) { + Assert.fail("noResult option should not throw server side exception"); + } + + try { + service.errorMethodWithCause(); + } catch (Exception e) { + Assert.fail("noResult option should not throw server side exception"); + } + long time = System.currentTimeMillis(); - ri.timeoutMethod(); + service.timeoutMethod(); time = System.currentTimeMillis() - time; - assertThat(time).describedAs("unacknowledged should still wait for the server to return a response").isGreaterThanOrEqualTo(2000); + assertThat(time).describedAs("noResult option should not wait for the server to return a response").isLessThan(2000); - r1.shutdown(); - r2.shutdown(); + try { + service.timeoutMethod(); + Assert.fail("noResult option should still wait for the server to ack the request and throw if the ack timeout is exceeded"); + } catch (Exception e) { + assertThat(e).isInstanceOf(RemoteServiceAckTimeoutException.class); + } + + client.shutdown(); + server.shutdown(); } @Test - public void testFireAndForgetInvocations() throws InterruptedException { - RedissonClient r1 = Redisson.create(); - r1.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl()); + public void testNoAckWithoutResultInvocations() throws InterruptedException { + RedissonClient server = Redisson.create(); + RedissonClient client = Redisson.create(); - RedissonClient r2 = Redisson.create(); - RemoteInterface ri = r2.getRemoteSerivce().get(RemoteInterface.class, -1, TimeUnit.SECONDS); + server.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl()); - ri.voidMethod("someName", 100L); - assertThat(ri.resultMethod(100L)).isNull(); + // no ack fire and forget + RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noAck().noResult(); + RemoteInterface service = client.getRemoteSerivce().get(RemoteInterface.class, options); + RemoteInterface invalidService = client.getRemoteSerivce("Invalid").get(RemoteInterface.class, options); - assertThat(ri.primitiveMethod()).isEqualTo(0); + service.voidMethod("noAck/noResult", 100L); try { - ri.errorMethod(); + service.resultMethod(100L); + Assert.fail(); + } catch (Exception e) { + assertThat(e).isInstanceOf(IllegalArgumentException.class); + } + + try { + service.errorMethod(); } catch (IOException e) { - Assert.fail("fire-and-forget should not throw"); + Assert.fail("noAck with noResult options should not throw server side exception"); } try { - ri.errorMethodWithCause(); + service.errorMethodWithCause(); } catch (Exception e) { - Assert.fail("fire-and-forget should not throw"); + Assert.fail("noAck with noResult options should not throw server side exception"); } long time = System.currentTimeMillis(); - ri.timeoutMethod(); + service.timeoutMethod(); time = System.currentTimeMillis() - time; - assertThat(time).describedAs("fire-and-forget should not wait for the server to return a response").isLessThan(2000); + assertThat(time).describedAs("noAck with noResult options should not wait for the server to return a response").isLessThan(2000); - r1.shutdown(); - r2.shutdown(); + try { + invalidService.voidMethod("noAck/noResult", 21L); + } catch (Exception e) { + Assert.fail("noAck with noResult options should not throw any exception even while invoking a service in an unregistered services namespace"); + } + + client.shutdown(); + server.shutdown(); } } From d1fe17c2015375d243f6771c62da4063d808bf42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pierre-David=20Be=CC=81langer?= Date: Sat, 21 May 2016 11:20:16 -0400 Subject: [PATCH 11/15] Do not rely on negative values in RedissonRemoteService, use RemoteInvocationOptions methods instead --- .../org/redisson/RedissonRemoteService.java | 31 ++++++++++--------- .../core/RemoteInvocationOptions.java | 5 +++ 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/src/main/java/org/redisson/RedissonRemoteService.java b/src/main/java/org/redisson/RedissonRemoteService.java index 5e926de04..44212b89c 100644 --- a/src/main/java/org/redisson/RedissonRemoteService.java +++ b/src/main/java/org/redisson/RedissonRemoteService.java @@ -191,8 +191,7 @@ public class RedissonRemoteService implements RRemoteService { public T get(final Class remoteInterface, RemoteInvocationOptions options) { // local copy of the options, to prevent mutation - final long ackTimeoutInMillis = options.isAckExpected() ? options.getAckTimeoutInMillis() : -1; - final long executionTimeoutInMillis = options.isResultExpected() ? options.getExecutionTimeoutInMillis() : -1; + final RemoteInvocationOptions optionsCopy = new RemoteInvocationOptions(options); final String toString = getClass().getSimpleName() + "-" + remoteInterface.getSimpleName() + "-proxy-" + generateRequestId(); InvocationHandler handler = new InvocationHandler() { @Override @@ -205,36 +204,40 @@ public class RedissonRemoteService implements RRemoteService { return toString.hashCode(); } - if (executionTimeoutInMillis < 0 && !(method.getReturnType().equals(Void.class) || method.getReturnType().equals(Void.TYPE))) + if (!optionsCopy.isResultExpected() && !(method.getReturnType().equals(Void.class) || method.getReturnType().equals(Void.TYPE))) throw new IllegalArgumentException("The noResult option only supports void return value"); String requestId = generateRequestId(); String requestQueueName = name + ":{" + remoteInterface.getName() + "}"; RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName); - RemoteServiceRequest request = new RemoteServiceRequest(requestId, method.getName(), args, - ackTimeoutInMillis, executionTimeoutInMillis, System.currentTimeMillis()); + RemoteServiceRequest request = new RemoteServiceRequest(requestId, + method.getName(), + args, + optionsCopy.isAckExpected() ? optionsCopy.getAckTimeoutInMillis() : -1, + optionsCopy.isResultExpected() ? optionsCopy.getExecutionTimeoutInMillis() : -1, + System.currentTimeMillis()); requestQueue.add(request); RBlockingQueue responseQueue = null; - if (ackTimeoutInMillis >= 0 || executionTimeoutInMillis >= 0) { + if (optionsCopy.isAckExpected() || optionsCopy.isResultExpected()) { String responseName = name + ":{" + remoteInterface.getName() + "}:" + requestId; responseQueue = redisson.getBlockingQueue(responseName); } - // negative ackTimeout means unacknowledged call, do not poll for the ack - if (ackTimeoutInMillis >= 0) { - RemoteServiceAck ack = (RemoteServiceAck) responseQueue.poll(ackTimeoutInMillis, TimeUnit.MILLISECONDS); + // poll for the ack only if expected + if (optionsCopy.isAckExpected()) { + RemoteServiceAck ack = (RemoteServiceAck) responseQueue.poll(optionsCopy.getAckTimeoutInMillis(), TimeUnit.MILLISECONDS); if (ack == null) { - throw new RemoteServiceAckTimeoutException("No ACK response after " + ackTimeoutInMillis + "ms for request: " + request); + throw new RemoteServiceAckTimeoutException("No ACK response after " + optionsCopy.getAckTimeoutInMillis() + "ms for request: " + request); } } - // negative executionTimeout means fire-and-forget call, do not poll for the response - if (executionTimeoutInMillis >= 0) { - RemoteServiceResponse response = (RemoteServiceResponse) responseQueue.poll(executionTimeoutInMillis, TimeUnit.MILLISECONDS); + // poll for the response only if expected + if (optionsCopy.isResultExpected()) { + RemoteServiceResponse response = (RemoteServiceResponse) responseQueue.poll(optionsCopy.getExecutionTimeoutInMillis(), TimeUnit.MILLISECONDS); if (response == null) { - throw new RemoteServiceTimeoutException("No response after " + executionTimeoutInMillis + "ms for request: " + request); + throw new RemoteServiceTimeoutException("No response after " + optionsCopy.getExecutionTimeoutInMillis() + "ms for request: " + request); } if (response.getError() != null) { throw response.getError(); diff --git a/src/main/java/org/redisson/core/RemoteInvocationOptions.java b/src/main/java/org/redisson/core/RemoteInvocationOptions.java index 0d386f6b1..628cc5147 100644 --- a/src/main/java/org/redisson/core/RemoteInvocationOptions.java +++ b/src/main/java/org/redisson/core/RemoteInvocationOptions.java @@ -65,6 +65,11 @@ public class RemoteInvocationOptions { this.executionTimeoutInMillis = executionTimeoutInMillis; } + public RemoteInvocationOptions(RemoteInvocationOptions copy) { + this.ackTimeoutInMillis = copy.ackTimeoutInMillis; + this.executionTimeoutInMillis = copy.executionTimeoutInMillis; + } + /** * Creates a new instance of RemoteInvocationOptions with opinionated defaults. *

From 9ee20f69f5e1cfa049bd7adb3bf29b1c58fbcc82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pierre-David=20Be=CC=81langer?= Date: Sun, 22 May 2016 12:49:44 -0400 Subject: [PATCH 12/15] send the RemoteInvocationOptions into the RemoteServiceRequest for server side use --- .../org/redisson/RedissonRemoteService.java | 24 +-- .../core/RemoteInvocationOptions.java | 21 +- .../redisson/remote/RemoteServiceRequest.java | 26 +-- .../redisson/RedissonRemoteServiceTest.java | 195 +++++++++--------- 4 files changed, 135 insertions(+), 131 deletions(-) diff --git a/src/main/java/org/redisson/RedissonRemoteService.java b/src/main/java/org/redisson/RedissonRemoteService.java index 44212b89c..2359a57e4 100644 --- a/src/main/java/org/redisson/RedissonRemoteService.java +++ b/src/main/java/org/redisson/RedissonRemoteService.java @@ -99,8 +99,8 @@ public class RedissonRemoteService implements RRemoteService { // subscribe(remoteInterface, requestQueue); final RemoteServiceRequest request = future.getNow(); - // negative ackTimeout means unacknowledged call, do not check the ack - if (request.getAckTimeout() >= 0 && System.currentTimeMillis() - request.getDate() > request.getAckTimeout()) { + // check the ack only if expected + if (request.getOptions().isAckExpected() && System.currentTimeMillis() - request.getDate() > request.getOptions().getAckTimeoutInMillis()) { log.debug("request: {} has been skipped due to ackTimeout"); // re-subscribe after a skipped ackTimeout subscribe(remoteInterface, requestQueue); @@ -110,9 +110,9 @@ public class RedissonRemoteService implements RRemoteService { final RemoteServiceMethod method = beans.get(new RemoteServiceKey(remoteInterface, request.getMethodName())); final String responseName = name + ":{" + remoteInterface.getName() + "}:" + request.getRequestId(); - // negative ackTimeout means unacknowledged call, do not send the ack - if (request.getAckTimeout() >= 0) { - Future> ackClientsFuture = send(request.getAckTimeout(), responseName, new RemoteServiceAck()); + // send the ack only if expected + if (request.getOptions().isAckExpected()) { + Future> ackClientsFuture = send(request.getOptions().getAckTimeoutInMillis(), responseName, new RemoteServiceAck()); ackClientsFuture.addListener(new FutureListener>() { @Override public void operationComplete(Future> future) throws Exception { @@ -149,9 +149,9 @@ public class RedissonRemoteService implements RRemoteService { log.error("Can't execute: " + request, e); } - // negative responseTimeout means fire-and-forget call, do not send the response - if (request.getResponseTimeout() >= 0) { - Future> clientsFuture = send(request.getResponseTimeout(), responseName, responseHolder.get()); + // send the response only if expected + if (request.getOptions().isResultExpected()) { + Future> clientsFuture = send(request.getOptions().getExecutionTimeoutInMillis(), responseName, responseHolder.get()); clientsFuture.addListener(new FutureListener>() { @Override public void operationComplete(Future> future) throws Exception { @@ -189,7 +189,7 @@ public class RedissonRemoteService implements RRemoteService { .expectResultWithin(executionTimeout, executionTimeUnit)); } - public T get(final Class remoteInterface, RemoteInvocationOptions options) { + public T get(final Class remoteInterface, final RemoteInvocationOptions options) { // local copy of the options, to prevent mutation final RemoteInvocationOptions optionsCopy = new RemoteInvocationOptions(options); final String toString = getClass().getSimpleName() + "-" + remoteInterface.getSimpleName() + "-proxy-" + generateRequestId(); @@ -212,11 +212,7 @@ public class RedissonRemoteService implements RRemoteService { String requestQueueName = name + ":{" + remoteInterface.getName() + "}"; RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName); RemoteServiceRequest request = new RemoteServiceRequest(requestId, - method.getName(), - args, - optionsCopy.isAckExpected() ? optionsCopy.getAckTimeoutInMillis() : -1, - optionsCopy.isResultExpected() ? optionsCopy.getExecutionTimeoutInMillis() : -1, - System.currentTimeMillis()); + method.getName(), args, optionsCopy, System.currentTimeMillis()); requestQueue.add(request); RBlockingQueue responseQueue = null; diff --git a/src/main/java/org/redisson/core/RemoteInvocationOptions.java b/src/main/java/org/redisson/core/RemoteInvocationOptions.java index 628cc5147..265ea81a6 100644 --- a/src/main/java/org/redisson/core/RemoteInvocationOptions.java +++ b/src/main/java/org/redisson/core/RemoteInvocationOptions.java @@ -15,6 +15,7 @@ */ package org.redisson.core; +import java.io.Serializable; import java.util.concurrent.TimeUnit; /** @@ -55,14 +56,12 @@ import java.util.concurrent.TimeUnit; * * @see RRemoteService#get(Class, RemoteInvocationOptions) */ -public class RemoteInvocationOptions { +public class RemoteInvocationOptions implements Serializable { private Long ackTimeoutInMillis; private Long executionTimeoutInMillis; - private RemoteInvocationOptions(Long ackTimeoutInMillis, Long executionTimeoutInMillis) { - this.ackTimeoutInMillis = ackTimeoutInMillis; - this.executionTimeoutInMillis = executionTimeoutInMillis; + private RemoteInvocationOptions() { } public RemoteInvocationOptions(RemoteInvocationOptions copy) { @@ -75,13 +74,15 @@ public class RemoteInvocationOptions { *

* This is equivalent to: *

-     *     RemoteInvocationOptions.defaults()
+     *     new RemoteInvocationOptions()
      *      .expectAckWithin(1, TimeUnit.SECONDS)
      *      .expectResultWithin(30, TimeUnit.SECONDS)
      * 
*/ public static RemoteInvocationOptions defaults() { - return new RemoteInvocationOptions(TimeUnit.SECONDS.toMillis(1), TimeUnit.SECONDS.toMillis(30)); + return new RemoteInvocationOptions() + .expectAckWithin(1, TimeUnit.SECONDS) + .expectResultWithin(20, TimeUnit.SECONDS); } public Long getAckTimeoutInMillis() { @@ -129,4 +130,12 @@ public class RemoteInvocationOptions { executionTimeoutInMillis = null; return this; } + + @Override + public String toString() { + return "RemoteInvocationOptions[" + + "ackTimeoutInMillis=" + ackTimeoutInMillis + + ", executionTimeoutInMillis=" + executionTimeoutInMillis + + ']'; + } } diff --git a/src/main/java/org/redisson/remote/RemoteServiceRequest.java b/src/main/java/org/redisson/remote/RemoteServiceRequest.java index 3b65c85fb..adc4267f8 100644 --- a/src/main/java/org/redisson/remote/RemoteServiceRequest.java +++ b/src/main/java/org/redisson/remote/RemoteServiceRequest.java @@ -15,6 +15,8 @@ */ package org.redisson.remote; +import org.redisson.core.RemoteInvocationOptions; + import java.io.Serializable; import java.util.Arrays; @@ -23,36 +25,26 @@ public class RemoteServiceRequest implements Serializable { private String requestId; private String methodName; private Object[] args; - private long ackTimeout; - private long responseTimeout; + private RemoteInvocationOptions options; private long date; public RemoteServiceRequest() { } - public RemoteServiceRequest(String requestId, String methodName, Object[] args, long ackTimeout, long responseTimeout, long date) { + public RemoteServiceRequest(String requestId, String methodName, Object[] args, RemoteInvocationOptions options, long date) { super(); this.requestId = requestId; this.methodName = methodName; this.args = args; - this.ackTimeout = ackTimeout; - this.responseTimeout = responseTimeout; + this.options = options; this.date = date; } - public long getResponseTimeout() { - return responseTimeout; - } - public long getDate() { return date; } - public long getAckTimeout() { - return ackTimeout; - } - public String getRequestId() { return requestId; } @@ -60,7 +52,11 @@ public class RemoteServiceRequest implements Serializable { public Object[] getArgs() { return args; } - + + public RemoteInvocationOptions getOptions() { + return options; + } + public String getMethodName() { return methodName; } @@ -68,7 +64,7 @@ public class RemoteServiceRequest implements Serializable { @Override public String toString() { return "RemoteServiceRequest [requestId=" + requestId + ", methodName=" + methodName + ", args=" - + Arrays.toString(args) + ", ackTimeout=" + ackTimeout + ", date=" + date + "]"; + + Arrays.toString(args) + ", options=" + options + ", date=" + date + "]"; } } diff --git a/src/test/java/org/redisson/RedissonRemoteServiceTest.java b/src/test/java/org/redisson/RedissonRemoteServiceTest.java index 26a20d902..85e7f015f 100644 --- a/src/test/java/org/redisson/RedissonRemoteServiceTest.java +++ b/src/test/java/org/redisson/RedissonRemoteServiceTest.java @@ -383,135 +383,138 @@ public class RedissonRemoteServiceTest extends BaseTest { public void testNoAckWithResultInvocations() throws InterruptedException { RedissonClient server = Redisson.create(); RedissonClient client = Redisson.create(); + try { + server.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl()); - server.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl()); - - // no ack but an execution timeout of 1 second - RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.SECONDS); - RemoteInterface service = client.getRemoteSerivce().get(RemoteInterface.class, options); + // no ack but an execution timeout of 1 second + RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.SECONDS); + RemoteInterface service = client.getRemoteSerivce().get(RemoteInterface.class, options); - service.voidMethod("noAck", 100L); - assertThat(service.resultMethod(21L)).isEqualTo(42); + service.voidMethod("noAck", 100L); + assertThat(service.resultMethod(21L)).isEqualTo(42); - try { - service.errorMethod(); - Assert.fail(); - } catch (IOException e) { - assertThat(e.getMessage()).isEqualTo("Checking error throw"); - } + try { + service.errorMethod(); + Assert.fail(); + } catch (IOException e) { + assertThat(e.getMessage()).isEqualTo("Checking error throw"); + } - try { - service.errorMethodWithCause(); - Assert.fail(); - } catch (Exception e) { - assertThat(e.getCause()).isInstanceOf(ArithmeticException.class); - assertThat(e.getCause().getMessage()).isEqualTo("/ by zero"); - } + try { + service.errorMethodWithCause(); + Assert.fail(); + } catch (Exception e) { + assertThat(e.getCause()).isInstanceOf(ArithmeticException.class); + assertThat(e.getCause().getMessage()).isEqualTo("/ by zero"); + } - try { - service.timeoutMethod(); - Assert.fail("noAck option should still wait for the server to return a response and throw if the execution timeout is exceeded"); - } catch (Exception e) { - assertThat(e).isInstanceOf(RemoteServiceTimeoutException.class); + try { + service.timeoutMethod(); + Assert.fail("noAck option should still wait for the server to return a response and throw if the execution timeout is exceeded"); + } catch (Exception e) { + assertThat(e).isInstanceOf(RemoteServiceTimeoutException.class); + } + } finally { + client.shutdown(); + server.shutdown(); } - - client.shutdown(); - server.shutdown(); } @Test public void testAckWithoutResultInvocations() throws InterruptedException { RedissonClient server = Redisson.create(); RedissonClient client = Redisson.create(); + try { + server.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl()); - server.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl()); - - // fire and forget with an ack timeout of 1 sec - RemoteInvocationOptions options = RemoteInvocationOptions.defaults().expectAckWithin(1, TimeUnit.SECONDS).noResult(); - RemoteInterface service = client.getRemoteSerivce().get(RemoteInterface.class, options); + // fire and forget with an ack timeout of 1 sec + RemoteInvocationOptions options = RemoteInvocationOptions.defaults().expectAckWithin(1, TimeUnit.SECONDS).noResult(); + RemoteInterface service = client.getRemoteSerivce().get(RemoteInterface.class, options); - service.voidMethod("noResult", 100L); + service.voidMethod("noResult", 100L); - try { - service.resultMethod(100L); - Assert.fail(); - } catch (Exception e) { - assertThat(e).isInstanceOf(IllegalArgumentException.class); - } - - try { - service.errorMethod(); - } catch (IOException e) { - Assert.fail("noResult option should not throw server side exception"); - } + try { + service.resultMethod(100L); + Assert.fail(); + } catch (Exception e) { + assertThat(e).isInstanceOf(IllegalArgumentException.class); + } - try { - service.errorMethodWithCause(); - } catch (Exception e) { - Assert.fail("noResult option should not throw server side exception"); - } + try { + service.errorMethod(); + } catch (IOException e) { + Assert.fail("noResult option should not throw server side exception"); + } - long time = System.currentTimeMillis(); - service.timeoutMethod(); - time = System.currentTimeMillis() - time; - assertThat(time).describedAs("noResult option should not wait for the server to return a response").isLessThan(2000); + try { + service.errorMethodWithCause(); + } catch (Exception e) { + Assert.fail("noResult option should not throw server side exception"); + } - try { + long time = System.currentTimeMillis(); service.timeoutMethod(); - Assert.fail("noResult option should still wait for the server to ack the request and throw if the ack timeout is exceeded"); - } catch (Exception e) { - assertThat(e).isInstanceOf(RemoteServiceAckTimeoutException.class); - } + time = System.currentTimeMillis() - time; + assertThat(time).describedAs("noResult option should not wait for the server to return a response").isLessThan(2000); - client.shutdown(); - server.shutdown(); + try { + service.timeoutMethod(); + Assert.fail("noResult option should still wait for the server to ack the request and throw if the ack timeout is exceeded"); + } catch (Exception e) { + assertThat(e).isInstanceOf(RemoteServiceAckTimeoutException.class); + } + } finally { + client.shutdown(); + server.shutdown(); + } } @Test public void testNoAckWithoutResultInvocations() throws InterruptedException { RedissonClient server = Redisson.create(); RedissonClient client = Redisson.create(); + try { + server.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl()); - server.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl()); - - // no ack fire and forget - RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noAck().noResult(); - RemoteInterface service = client.getRemoteSerivce().get(RemoteInterface.class, options); - RemoteInterface invalidService = client.getRemoteSerivce("Invalid").get(RemoteInterface.class, options); + // no ack fire and forget + RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noAck().noResult(); + RemoteInterface service = client.getRemoteSerivce().get(RemoteInterface.class, options); + RemoteInterface invalidService = client.getRemoteSerivce("Invalid").get(RemoteInterface.class, options); - service.voidMethod("noAck/noResult", 100L); + service.voidMethod("noAck/noResult", 100L); - try { - service.resultMethod(100L); - Assert.fail(); - } catch (Exception e) { - assertThat(e).isInstanceOf(IllegalArgumentException.class); - } + try { + service.resultMethod(100L); + Assert.fail(); + } catch (Exception e) { + assertThat(e).isInstanceOf(IllegalArgumentException.class); + } - try { - service.errorMethod(); - } catch (IOException e) { - Assert.fail("noAck with noResult options should not throw server side exception"); - } + try { + service.errorMethod(); + } catch (IOException e) { + Assert.fail("noAck with noResult options should not throw server side exception"); + } - try { - service.errorMethodWithCause(); - } catch (Exception e) { - Assert.fail("noAck with noResult options should not throw server side exception"); - } + try { + service.errorMethodWithCause(); + } catch (Exception e) { + Assert.fail("noAck with noResult options should not throw server side exception"); + } - long time = System.currentTimeMillis(); - service.timeoutMethod(); - time = System.currentTimeMillis() - time; - assertThat(time).describedAs("noAck with noResult options should not wait for the server to return a response").isLessThan(2000); + long time = System.currentTimeMillis(); + service.timeoutMethod(); + time = System.currentTimeMillis() - time; + assertThat(time).describedAs("noAck with noResult options should not wait for the server to return a response").isLessThan(2000); - try { - invalidService.voidMethod("noAck/noResult", 21L); - } catch (Exception e) { - Assert.fail("noAck with noResult options should not throw any exception even while invoking a service in an unregistered services namespace"); + try { + invalidService.voidMethod("noAck/noResult", 21L); + } catch (Exception e) { + Assert.fail("noAck with noResult options should not throw any exception even while invoking a service in an unregistered services namespace"); + } + } finally { + client.shutdown(); + server.shutdown(); } - - client.shutdown(); - server.shutdown(); } } From 6383d5088b88916c0535bf6784ddf7398343d241 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 23 May 2016 13:09:54 +0300 Subject: [PATCH 13/15] FSTObjectOutput shouldn't be closed after write. #505 --- src/main/java/org/redisson/codec/FstCodec.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/org/redisson/codec/FstCodec.java b/src/main/java/org/redisson/codec/FstCodec.java index 52e7788b2..a8c0ddf72 100644 --- a/src/main/java/org/redisson/codec/FstCodec.java +++ b/src/main/java/org/redisson/codec/FstCodec.java @@ -72,8 +72,6 @@ public class FstCodec implements Codec { ByteArrayOutputStream os = new ByteArrayOutputStream(); FSTObjectOutput oos = config.getObjectOutput(os); oos.writeObject(in); - oos.close(); - return os.toByteArray(); } }; From 6efc8469684fc071f6ef6929f0e1ebe793683a15 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Mon, 23 May 2016 17:22:47 +0300 Subject: [PATCH 14/15] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index da1e0137e..0308d8bb7 100644 --- a/README.md +++ b/README.md @@ -64,7 +64,7 @@ Features * [Spring cache](http://docs.spring.io/spring/docs/current/spring-framework-reference/html/cache.html) integration * Supports [Reactive Streams](http://www.reactive-streams.org) * Supports [Redis pipelining](http://redis.io/topics/pipelining) (command batches) -* Supports [Remote services](https://github.com/mrniko/redisson/wiki/5.-distributed-objects#59-remote-service) +* Supports [Remote services](https://github.com/mrniko/redisson/wiki/6.-distributed-objects#69-remote-service) * Supports Android platform * Supports auto-reconnect * Supports failed to send command auto-retry From b9e832f3536d7493c6d99776235e3ae5b8a362ea Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 23 May 2016 18:11:03 +0300 Subject: [PATCH 15/15] FstCodec fixed --- src/main/java/org/redisson/codec/FstCodec.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/org/redisson/codec/FstCodec.java b/src/main/java/org/redisson/codec/FstCodec.java index a8c0ddf72..501883ac0 100644 --- a/src/main/java/org/redisson/codec/FstCodec.java +++ b/src/main/java/org/redisson/codec/FstCodec.java @@ -72,6 +72,7 @@ public class FstCodec implements Codec { ByteArrayOutputStream os = new ByteArrayOutputStream(); FSTObjectOutput oos = config.getObjectOutput(os); oos.writeObject(in); + oos.flush(); return os.toByteArray(); } };