diff --git a/src/main/java/org/redisson/RedissonRemoteService.java b/src/main/java/org/redisson/RedissonRemoteService.java index 09a9c8d18..b80124260 100644 --- a/src/main/java/org/redisson/RedissonRemoteService.java +++ b/src/main/java/org/redisson/RedissonRemoteService.java @@ -23,6 +23,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.redisson.client.RedisException; +import org.redisson.client.RedisTimeoutException; import org.redisson.core.MessageListener; import org.redisson.core.RBlockingQueue; import org.redisson.core.RRemoteService; @@ -97,7 +99,7 @@ public class RedissonRemoteService implements RRemoteService { long clients = topic.publish(response); if (clients == 0) { - log.error("None of clients has not received a response for: {}", request); + log.error("None of clients has not received a response: {} for request: {}", response, request); } subscribe(remoteInterface, requestQueue); @@ -119,7 +121,8 @@ public class RedissonRemoteService implements RRemoteService { String requestQueueName = "redisson_remote_service:{" + remoteInterface.getName() + "}"; RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName); - requestQueue.add(new RemoteServiceRequest(requestId, method.getName(), args)); + RemoteServiceRequest request = new RemoteServiceRequest(requestId, method.getName(), args); + requestQueue.add(request); String responseName = "redisson_remote_service:{" + remoteInterface.getName() + "}:" + requestId; final RTopic topic = redisson.getTopic(responseName); @@ -136,7 +139,10 @@ public class RedissonRemoteService implements RRemoteService { if (timeout == -1) { latch.await(); } else { - latch.await(timeout, timeUnit); + if (!latch.await(timeout, timeUnit)) { + topic.removeListener(listenerId); + throw new RedisTimeoutException("No response after " + timeUnit.toMillis(timeout) + "ms for request: " + request); + } } topic.removeListener(listenerId); RemoteServiceResponse msg = response.get(); diff --git a/src/main/java/org/redisson/RemoteServiceResponse.java b/src/main/java/org/redisson/RemoteServiceResponse.java index dd14bf06c..09ebc5999 100644 --- a/src/main/java/org/redisson/RemoteServiceResponse.java +++ b/src/main/java/org/redisson/RemoteServiceResponse.java @@ -38,5 +38,10 @@ public class RemoteServiceResponse { public Object getResult() { return result; } + + @Override + public String toString() { + return "RemoteServiceResponse [result=" + result + ", error=" + error + "]"; + } } diff --git a/src/main/java/org/redisson/command/CommandAsyncService.java b/src/main/java/org/redisson/command/CommandAsyncService.java index d7f0c6126..faba6d1fc 100644 --- a/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/src/main/java/org/redisson/command/CommandAsyncService.java @@ -493,8 +493,8 @@ public class CommandAsyncService implements CommandAsyncExecutor { details.getMainPromise().addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { + connectionManager.getShutdownPromise().removeListener(listener); if (!future.isCancelled()) { - connectionManager.getShutdownPromise().removeListener(listener); return; } // cancel handling for commands from skipTimeout collection diff --git a/src/test/java/org/redisson/RedissonRemoteServiceTest.java b/src/test/java/org/redisson/RedissonRemoteServiceTest.java index b6cede708..c8daedd90 100644 --- a/src/test/java/org/redisson/RedissonRemoteServiceTest.java +++ b/src/test/java/org/redisson/RedissonRemoteServiceTest.java @@ -2,9 +2,12 @@ package org.redisson; import org.junit.Assert; import org.junit.Test; +import org.redisson.client.RedisTimeoutException; + import static org.assertj.core.api.Assertions.*; import java.io.IOException; +import java.util.concurrent.TimeUnit; public class RedissonRemoteServiceTest extends BaseTest { @@ -18,6 +21,8 @@ public class RedissonRemoteServiceTest extends BaseTest { void errorMethodWithCause(); + void timeoutMethod() throws InterruptedException; + } public class RemoteImpl implements RemoteInterface { @@ -46,8 +51,29 @@ public class RedissonRemoteServiceTest extends BaseTest { } } + @Override + public void timeoutMethod() throws InterruptedException { + Thread.sleep(2000); + } + } + + @Test(expected = RedisTimeoutException.class) + public void testTimeout() throws InterruptedException { + RedissonClient r1 = Redisson.create(); + r1.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl()); + + RedissonClient r2 = Redisson.create(); + RemoteInterface ri = r2.getRemoteSerivce().get(RemoteInterface.class, 1, TimeUnit.SECONDS); + + try { + ri.timeoutMethod(); + } finally { + r1.shutdown(); + r2.shutdown(); + } + } @Test public void testInvocations() { @@ -74,7 +100,6 @@ public class RedissonRemoteServiceTest extends BaseTest { assertThat(e.getCause()).isInstanceOf(ArithmeticException.class); assertThat(e.getCause().getMessage()).isEqualTo("/ by zero"); } - r1.shutdown(); r2.shutdown();