From 835dfeaa8188521d76ecbd107c6875098a7d92fb Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 7 Dec 2016 11:31:41 +0300 Subject: [PATCH] Add timeout for subscribe operation #725 --- .../org/redisson/RedissonCountDownLatch.java | 6 +++--- .../src/main/java/org/redisson/RedissonLock.java | 2 +- .../java/org/redisson/RedissonPatternTopic.java | 2 +- .../RedissonPermitExpirableSemaphore.java | 2 +- .../java/org/redisson/RedissonSemaphore.java | 2 +- .../main/java/org/redisson/RedissonTopic.java | 2 +- .../redisson/command/CommandAsyncExecutor.java | 2 ++ .../redisson/command/CommandAsyncService.java | 16 ++++++++++++++++ 8 files changed, 26 insertions(+), 8 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonCountDownLatch.java b/redisson/src/main/java/org/redisson/RedissonCountDownLatch.java index 569d2ca62..e110550d6 100644 --- a/redisson/src/main/java/org/redisson/RedissonCountDownLatch.java +++ b/redisson/src/main/java/org/redisson/RedissonCountDownLatch.java @@ -50,9 +50,9 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown } public void await() throws InterruptedException { - RFuture promise = subscribe(); + RFuture future = subscribe(); try { - get(promise); + commandExecutor.syncSubscription(future); while (getCount() > 0) { // waiting for open state @@ -62,7 +62,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown } } } finally { - unsubscribe(promise); + unsubscribe(future); } } diff --git a/redisson/src/main/java/org/redisson/RedissonLock.java b/redisson/src/main/java/org/redisson/RedissonLock.java index 7abc63daa..fabaa59b2 100644 --- a/redisson/src/main/java/org/redisson/RedissonLock.java +++ b/redisson/src/main/java/org/redisson/RedissonLock.java @@ -118,7 +118,7 @@ public class RedissonLock extends RedissonExpirable implements RLock { long threadId = Thread.currentThread().getId(); RFuture future = subscribe(threadId); - get(future); + commandExecutor.syncSubscription(future); try { while (true) { diff --git a/redisson/src/main/java/org/redisson/RedissonPatternTopic.java b/redisson/src/main/java/org/redisson/RedissonPatternTopic.java index b99b8438d..8c8c2a19f 100644 --- a/redisson/src/main/java/org/redisson/RedissonPatternTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonPatternTopic.java @@ -64,7 +64,7 @@ public class RedissonPatternTopic implements RPatternTopic { private int addListener(RedisPubSubListener pubSubListener) { RFuture future = commandExecutor.getConnectionManager().psubscribe(name, codec, pubSubListener); - future.syncUninterruptibly(); + commandExecutor.syncSubscription(future); return System.identityHashCode(pubSubListener); } diff --git a/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java b/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java index 384366440..92434bc1c 100644 --- a/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java +++ b/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java @@ -91,7 +91,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen } RFuture future = subscribe(); - get(future); + commandExecutor.syncSubscription(future); try { while (true) { final Long nearestTimeout; diff --git a/redisson/src/main/java/org/redisson/RedissonSemaphore.java b/redisson/src/main/java/org/redisson/RedissonSemaphore.java index 0d406fdac..e65706242 100644 --- a/redisson/src/main/java/org/redisson/RedissonSemaphore.java +++ b/redisson/src/main/java/org/redisson/RedissonSemaphore.java @@ -79,7 +79,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { } RFuture future = subscribe(); - get(future); + commandExecutor.syncSubscription(future); try { while (true) { if (tryAcquire(permits)) { diff --git a/redisson/src/main/java/org/redisson/RedissonTopic.java b/redisson/src/main/java/org/redisson/RedissonTopic.java index 5bd324cf4..bce6b1297 100644 --- a/redisson/src/main/java/org/redisson/RedissonTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonTopic.java @@ -79,7 +79,7 @@ public class RedissonTopic implements RTopic { private int addListener(RedisPubSubListener pubSubListener) { RFuture future = commandExecutor.getConnectionManager().subscribe(codec, name, pubSubListener); - future.syncUninterruptibly(); + commandExecutor.syncSubscription(future); return System.identityHashCode(pubSubListener); } diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java b/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java index 49eca78d9..e36726524 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java @@ -50,6 +50,8 @@ public interface CommandAsyncExecutor { boolean await(RFuture RFuture, long timeout, TimeUnit timeoutUnit) throws InterruptedException; + void syncSubscription(RFuture future); + V get(RFuture RFuture); RFuture writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand command, Object ... params); diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 1dc08ddb3..2d23afab0 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -48,6 +48,7 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.connection.ConnectionManager; import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.NodeSource; +import org.redisson.connection.PubSubConnectionEntry; import org.redisson.connection.NodeSource.Redirect; import org.redisson.misc.LogHelper; import org.redisson.misc.RPromise; @@ -70,6 +71,7 @@ import org.redisson.client.protocol.ScoredEntry; import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.ScanObjectEntry; +import org.redisson.config.MasterSlaveServersConfig; import org.redisson.misc.RedissonObjectFactory; /** @@ -117,6 +119,20 @@ public class CommandAsyncService implements CommandAsyncExecutor { return redisson != null || redissonReactive != null; } + @Override + public void syncSubscription(RFuture future) { + MasterSlaveServersConfig config = connectionManager.getConfig(); + try { + int timeout = config.getTimeout() + config.getRetryInterval()*config.getRetryAttempts(); + if (!future.await(timeout)) { + throw new RedisTimeoutException("Subscribe timeout: (" + timeout + "ms)"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + future.syncUninterruptibly(); + } + @Override public V get(RFuture future) { if (!future.isDone()) {