diff --git a/src/main/java/org/redisson/RedissonRemoteService.java b/src/main/java/org/redisson/RedissonRemoteService.java index b80124260..ba99b01a4 100644 --- a/src/main/java/org/redisson/RedissonRemoteService.java +++ b/src/main/java/org/redisson/RedissonRemoteService.java @@ -23,12 +23,18 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import org.redisson.client.RedisException; -import org.redisson.client.RedisTimeoutException; import org.redisson.core.MessageListener; import org.redisson.core.RBlockingQueue; import org.redisson.core.RRemoteService; import org.redisson.core.RTopic; +import org.redisson.remote.RRemoteServiceResponse; +import org.redisson.remote.RemoteServiceAck; +import org.redisson.remote.RemoteServiceAckTimeoutException; +import org.redisson.remote.RemoteServiceKey; +import org.redisson.remote.RemoteServiceMethod; +import org.redisson.remote.RemoteServiceRequest; +import org.redisson.remote.RemoteServiceResponse; +import org.redisson.remote.RemoteServiceTimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,71 +87,126 @@ public class RedissonRemoteService implements RRemoteService { @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { + if (future.cause() instanceof RedissonShutdownException) { + return; + } + subscribe(remoteInterface, requestQueue); return; } + subscribe(remoteInterface, requestQueue); RemoteServiceRequest request = future.getNow(); + if (System.currentTimeMillis() - request.getDate() > request.getAckTimeout()) { + log.debug("request: {} has been skipped due to ackTimeout"); + return; + } + RemoteServiceMethod method = beans.get(new RemoteServiceKey(remoteInterface, request.getMethodName())); String responseName = "redisson_remote_service:{" + remoteInterface.getName() + "}:" + request.getRequestId(); - RTopic topic = redisson.getTopic(responseName); - RemoteServiceResponse response; - try { - Object result = method.getMethod().invoke(method.getBean(), request.getArgs()); - response = new RemoteServiceResponse(result); - } catch (Exception e) { - response = new RemoteServiceResponse(e.getCause()); - log.error("Can't execute: " + request, e); + RTopic topic = redisson.getTopic(responseName); + Future ackClientsFuture = topic.publishAsync(new RemoteServiceAck()); + ackClientsFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + log.error("Can't send ack for request: " + request, future.cause()); + return; + } + if (future.getNow() == 0) { + log.error("Client has not received ack for request: {}", request); + return; + } + + invokeMethod(request, method, topic); + } + }); + } + + }); + } + + private void invokeMethod(RemoteServiceRequest request, RemoteServiceMethod method, + RTopic topic) { + final AtomicReference responseHolder = new AtomicReference(); + try { + Object result = method.getMethod().invoke(method.getBean(), request.getArgs()); + RemoteServiceResponse response = new RemoteServiceResponse(result); + responseHolder.set(response); + } catch (Exception e) { + RemoteServiceResponse response = new RemoteServiceResponse(e.getCause()); + responseHolder.set(response); + log.error("Can't execute: " + request, e); + } + + Future clientsFuture = topic.publishAsync(responseHolder.get()); + clientsFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + return; } - long clients = topic.publish(response); - if (clients == 0) { - log.error("None of clients has not received a response: {} for request: {}", response, request); + if (future.getNow() == 0) { + log.error("None of clients has not received a response: {} for request: {}", responseHolder.get(), request); } - - subscribe(remoteInterface, requestQueue); } }); } - + @Override public T get(Class remoteInterface) { return get(remoteInterface, -1, null); } @Override - public T get(final Class remoteInterface, final int timeout, final TimeUnit timeUnit) { + public T get(final Class remoteInterface, final long executionTimeout, final TimeUnit executionTimeUnit) { + return get(remoteInterface, executionTimeout, executionTimeUnit, 1, TimeUnit.SECONDS); + } + + public T get(final Class remoteInterface, final long executionTimeout, final TimeUnit executionTimeUnit, + final long ackTimeout, final TimeUnit ackTimeUnit) { InvocationHandler handler = new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String requestId = generateRequestId(); - + String requestQueueName = "redisson_remote_service:{" + remoteInterface.getName() + "}"; RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName); - RemoteServiceRequest request = new RemoteServiceRequest(requestId, method.getName(), args); + RemoteServiceRequest request = new RemoteServiceRequest(requestId, method.getName(), args, ackTimeUnit.toMillis(ackTimeout), System.currentTimeMillis()); requestQueue.add(request); String responseName = "redisson_remote_service:{" + remoteInterface.getName() + "}:" + requestId; - final RTopic topic = redisson.getTopic(responseName); + final CountDownLatch ackLatch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1); - final AtomicReference response = new AtomicReference(); - int listenerId = topic.addListener(new MessageListener() { + final AtomicReference response = new AtomicReference(); + final RTopic topic = redisson.getTopic(responseName); + int listenerId = topic.addListener(new MessageListener() { @Override - public void onMessage(String channel, RemoteServiceResponse msg) { - response.set(msg); - latch.countDown(); + public void onMessage(String channel, RRemoteServiceResponse msg) { + if (msg instanceof RemoteServiceResponse) { + response.set(msg); + latch.countDown(); + } else { + ackLatch.countDown(); + } } }); - if (timeout == -1) { + if (!ackLatch.await(ackTimeout, ackTimeUnit)) { + topic.removeListener(listenerId); + throw new RemoteServiceAckTimeoutException("No ACK response after " + ackTimeUnit.toMillis(ackTimeout) + "ms for request: " + request); + } + + if (executionTimeout == -1) { latch.await(); } else { - if (!latch.await(timeout, timeUnit)) { + if (!latch.await(executionTimeout, executionTimeUnit)) { topic.removeListener(listenerId); - throw new RedisTimeoutException("No response after " + timeUnit.toMillis(timeout) + "ms for request: " + request); + throw new RemoteServiceTimeoutException("No response after " + executionTimeUnit.toMillis(executionTimeout) + "ms for request: " + request); } } topic.removeListener(listenerId); - RemoteServiceResponse msg = response.get(); + RemoteServiceResponse msg = (RemoteServiceResponse) response.get(); if (msg.getError() != null) { throw msg.getError(); } diff --git a/src/main/java/org/redisson/RedissonShutdownException.java b/src/main/java/org/redisson/RedissonShutdownException.java new file mode 100644 index 000000000..629255e0e --- /dev/null +++ b/src/main/java/org/redisson/RedissonShutdownException.java @@ -0,0 +1,11 @@ +package org.redisson; + +public class RedissonShutdownException extends RuntimeException { + + private static final long serialVersionUID = -2694051226420789395L; + + public RedissonShutdownException(String message) { + super(message); + } + +} diff --git a/src/main/java/org/redisson/codec/JsonJacksonCodec.java b/src/main/java/org/redisson/codec/JsonJacksonCodec.java index ab518b0ed..e90c5cef2 100755 --- a/src/main/java/org/redisson/codec/JsonJacksonCodec.java +++ b/src/main/java/org/redisson/codec/JsonJacksonCodec.java @@ -124,6 +124,7 @@ public class JsonJacksonCodec implements Codec { .withCreatorVisibility(JsonAutoDetect.Visibility.NONE)); objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); objectMapper.configure(SerializationFeature.WRITE_BIGDECIMAL_AS_PLAIN, true); + objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); objectMapper.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true); objectMapper.addMixIn(Throwable.class, ThrowableMixIn.class); } diff --git a/src/main/java/org/redisson/command/CommandAsyncService.java b/src/main/java/org/redisson/command/CommandAsyncService.java index faba6d1fc..2abe97b96 100644 --- a/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/src/main/java/org/redisson/command/CommandAsyncService.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.redisson.RedisClientResult; +import org.redisson.RedissonShutdownException; import org.redisson.SlotCallback; import org.redisson.client.RedisAskException; import org.redisson.client.RedisConnection; @@ -354,7 +355,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { } if (!connectionManager.getShutdownLatch().acquire()) { - mainPromise.setFailure(new IllegalStateException("Redisson is shutdown")); + mainPromise.setFailure(new RedissonShutdownException("Redisson is shutdown")); return; } diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 065e66f75..fe2a22872 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -677,8 +677,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public void shutdown() { + shutdownLatch.close(); shutdownPromise.trySuccess(true); - shutdownLatch.closeAndAwaitUninterruptibly(); + shutdownLatch.awaitUninterruptibly(); + for (MasterSlaveEntry entry : entries.values()) { entry.shutdown(); } diff --git a/src/main/java/org/redisson/core/RRemoteService.java b/src/main/java/org/redisson/core/RRemoteService.java index 1b75c5dc3..0461188ed 100644 --- a/src/main/java/org/redisson/core/RRemoteService.java +++ b/src/main/java/org/redisson/core/RRemoteService.java @@ -17,6 +17,46 @@ package org.redisson.core; import java.util.concurrent.TimeUnit; +import org.redisson.remote.RemoteServiceAckTimeoutException; + +/** + * Allows to execute object methods remotely between Redisson instances (Server side and Client side instances in terms of remote invocation). + *

+ * 1. Server side instance (worker instance). Register object with RRemoteService instance. + *

+ * + * RRemoteService remoteService = redisson.getRemoteService();
+ *
+ * // register remote service before any remote invocation
+ * remoteService.register(SomeServiceInterface.class, someServiceImpl); + *
+ *

+ * 2. Client side instance. Invokes method remotely. + *

+ * + * RRemoteService remoteService = redisson.getRemoteService();
+ * SomeServiceInterface service = remoteService.get(SomeServiceInterface.class);
+ *
+ * String result = service.doSomeStuff(1L, "secondParam", new AnyParam()); + *
+ *

+ *

+ * There are two timeouts during execution: + *

+ * Acknowledge (Ack) timeout.Client side instance waits for acknowledge message from Server side instance. + *

+ * If acknowledge has not been received by Client side instance then RemoteServiceAckTimeoutException will be thrown. + * And next invocation attempt can be made. + *

+ * If acknowledge has not been received Client side instance but Server side instance has received invocation message already. + * In this case invocation will be skipped, due to ack timeout checking by Server side instance. + *

+ * Execution timeout. Client side instance received acknowledge message. If it hasn't received any result or error + * from server side during execution timeout then RemoteServiceTimeoutException will be thrown. + * + * @author Nikita Koksharov + * + */ public interface RRemoteService { /** @@ -37,7 +77,8 @@ public interface RRemoteService { void register(Class remoteInterface, T object, int executorsAmount); /** - * Get remote service object for remote invocations + * Get remote service object for remote invocations. + * Uses ack timeout = 1000 ms by default * * @param remoteInterface * @return @@ -46,13 +87,26 @@ public interface RRemoteService { /** * Get remote service object for remote invocations - * with specified invocation timeout + * with specified invocation timeout. Uses ack timeout = 1000 ms by default + * + * @param remoteInterface + * @param executionTimeout - invocation timeout + * @param executionTimeUnit + * @return + */ + T get(Class remoteInterface, long executionTimeout, TimeUnit executionTimeUnit); + + /** + * Get remote service object for remote invocations + * with specified invocation and ack timeouts * * @param remoteInterface - * @param timeout - invocation timeout - * @param timeUnit + * @param executionTimeout - invocation timeout + * @param executionTimeUnit + * @param ackTimeout - ack timeout + * @param ackTimeUnit * @return */ - T get(Class remoteInterface, int timeout, TimeUnit timeUnit); + T get(Class remoteInterface, long executionTimeout, TimeUnit executionTimeUnit, long ackTimeout, TimeUnit ackTimeUnit); } diff --git a/src/main/java/org/redisson/misc/InfinitySemaphoreLatch.java b/src/main/java/org/redisson/misc/InfinitySemaphoreLatch.java index 1242ce571..e737cd711 100644 --- a/src/main/java/org/redisson/misc/InfinitySemaphoreLatch.java +++ b/src/main/java/org/redisson/misc/InfinitySemaphoreLatch.java @@ -83,9 +83,12 @@ public class InfinitySemaphoreLatch extends AbstractQueuedSynchronizer { return closed; } + public void close() { + closed = true; + } + // waiting for an open state - public final boolean closeAndAwaitUninterruptibly() { - closed = true; + public final boolean awaitUninterruptibly() { try { return await(15, TimeUnit.SECONDS); } catch (InterruptedException e) { diff --git a/src/main/java/org/redisson/remote/RRemoteServiceResponse.java b/src/main/java/org/redisson/remote/RRemoteServiceResponse.java new file mode 100644 index 000000000..de563757e --- /dev/null +++ b/src/main/java/org/redisson/remote/RRemoteServiceResponse.java @@ -0,0 +1,5 @@ +package org.redisson.remote; + +public interface RRemoteServiceResponse { + +} diff --git a/src/main/java/org/redisson/remote/RemoteServiceAck.java b/src/main/java/org/redisson/remote/RemoteServiceAck.java new file mode 100644 index 000000000..497fe1e7b --- /dev/null +++ b/src/main/java/org/redisson/remote/RemoteServiceAck.java @@ -0,0 +1,11 @@ +package org.redisson.remote; + +/** + * Worker sends this message when it has received a {@link RemoteServiceRequest}. + * + * @author Nikita Koksharov + * + */ +public class RemoteServiceAck implements RRemoteServiceResponse { + +} diff --git a/src/main/java/org/redisson/remote/RemoteServiceAckTimeoutException.java b/src/main/java/org/redisson/remote/RemoteServiceAckTimeoutException.java new file mode 100644 index 000000000..76d4b979e --- /dev/null +++ b/src/main/java/org/redisson/remote/RemoteServiceAckTimeoutException.java @@ -0,0 +1,21 @@ +package org.redisson.remote; + +/** + * Rises when remote method executor has not answered + * within Ack timeout. + *

+ * Method invocation has not been started in this case. + * So a new invocation attempt can be made. + * + * @author Nikita Koksharov + * + */ +public class RemoteServiceAckTimeoutException extends RuntimeException { + + private static final long serialVersionUID = 1820133675653636587L; + + public RemoteServiceAckTimeoutException(String message) { + super(message); + } + +} diff --git a/src/main/java/org/redisson/RemoteServiceKey.java b/src/main/java/org/redisson/remote/RemoteServiceKey.java similarity index 98% rename from src/main/java/org/redisson/RemoteServiceKey.java rename to src/main/java/org/redisson/remote/RemoteServiceKey.java index 5cadb3a5a..20f7ea9ff 100644 --- a/src/main/java/org/redisson/RemoteServiceKey.java +++ b/src/main/java/org/redisson/remote/RemoteServiceKey.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.redisson; +package org.redisson.remote; public class RemoteServiceKey { diff --git a/src/main/java/org/redisson/RemoteServiceMethod.java b/src/main/java/org/redisson/remote/RemoteServiceMethod.java similarity index 97% rename from src/main/java/org/redisson/RemoteServiceMethod.java rename to src/main/java/org/redisson/remote/RemoteServiceMethod.java index 153c82d19..26998214f 100644 --- a/src/main/java/org/redisson/RemoteServiceMethod.java +++ b/src/main/java/org/redisson/remote/RemoteServiceMethod.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.redisson; +package org.redisson.remote; import java.lang.reflect.Method; diff --git a/src/main/java/org/redisson/RemoteServiceRequest.java b/src/main/java/org/redisson/remote/RemoteServiceRequest.java similarity index 70% rename from src/main/java/org/redisson/RemoteServiceRequest.java rename to src/main/java/org/redisson/remote/RemoteServiceRequest.java index 434784456..f47324f10 100644 --- a/src/main/java/org/redisson/RemoteServiceRequest.java +++ b/src/main/java/org/redisson/remote/RemoteServiceRequest.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.redisson; +package org.redisson.remote; import java.util.Arrays; @@ -22,15 +22,27 @@ public class RemoteServiceRequest { private String requestId; private String methodName; private Object[] args; + private long ackTimeout; + private long date; public RemoteServiceRequest() { } - public RemoteServiceRequest(String requestId, String methodName, Object[] args) { + public RemoteServiceRequest(String requestId, String methodName, Object[] args, long ackTimeout, long date) { super(); this.requestId = requestId; this.methodName = methodName; this.args = args; + this.ackTimeout = ackTimeout; + this.date = date; + } + + public long getDate() { + return date; + } + + public long getAckTimeout() { + return ackTimeout; } public String getRequestId() { @@ -47,8 +59,8 @@ public class RemoteServiceRequest { @Override public String toString() { - return "RemoteServiceRequest[requestId=" + requestId + ", methodName=" + methodName + ", args=" - + Arrays.toString(args) + "]"; + return "RemoteServiceRequest [requestId=" + requestId + ", methodName=" + methodName + ", args=" + + Arrays.toString(args) + ", ackTimeout=" + ackTimeout + ", date=" + date + "]"; } } diff --git a/src/main/java/org/redisson/RemoteServiceResponse.java b/src/main/java/org/redisson/remote/RemoteServiceResponse.java similarity index 92% rename from src/main/java/org/redisson/RemoteServiceResponse.java rename to src/main/java/org/redisson/remote/RemoteServiceResponse.java index 09ebc5999..55d074870 100644 --- a/src/main/java/org/redisson/RemoteServiceResponse.java +++ b/src/main/java/org/redisson/remote/RemoteServiceResponse.java @@ -13,9 +13,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.redisson; +package org.redisson.remote; -public class RemoteServiceResponse { +public class RemoteServiceResponse implements RRemoteServiceResponse { private Object result; private Throwable error; diff --git a/src/main/java/org/redisson/remote/RemoteServiceTimeoutException.java b/src/main/java/org/redisson/remote/RemoteServiceTimeoutException.java new file mode 100644 index 000000000..27316abf6 --- /dev/null +++ b/src/main/java/org/redisson/remote/RemoteServiceTimeoutException.java @@ -0,0 +1,17 @@ +package org.redisson.remote; + +/** + * Rises when invocation timeout has been occurred + * + * @author Nikita Koksharov + * + */ +public class RemoteServiceTimeoutException extends RuntimeException { + + private static final long serialVersionUID = -1749266931994840256L; + + public RemoteServiceTimeoutException(String message) { + super(message); + } + +} diff --git a/src/test/java/org/redisson/RedissonRemoteServiceTest.java b/src/test/java/org/redisson/RedissonRemoteServiceTest.java index c8daedd90..5739bc7bc 100644 --- a/src/test/java/org/redisson/RedissonRemoteServiceTest.java +++ b/src/test/java/org/redisson/RedissonRemoteServiceTest.java @@ -1,14 +1,14 @@ package org.redisson; -import org.junit.Assert; -import org.junit.Test; -import org.redisson.client.RedisTimeoutException; - -import static org.assertj.core.api.Assertions.*; +import static org.assertj.core.api.Assertions.assertThat; import java.io.IOException; import java.util.concurrent.TimeUnit; +import org.junit.Assert; +import org.junit.Test; +import org.redisson.remote.RemoteServiceTimeoutException; + public class RedissonRemoteServiceTest extends BaseTest { public interface RemoteInterface { @@ -59,7 +59,7 @@ public class RedissonRemoteServiceTest extends BaseTest { } - @Test(expected = RedisTimeoutException.class) + @Test(expected = RemoteServiceTimeoutException.class) public void testTimeout() throws InterruptedException { RedissonClient r1 = Redisson.create(); r1.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl());