Add timeout for subscribe operation #725

pull/727/head
Nikita 8 years ago
parent a6cb812519
commit 835dfeaa81

@ -50,9 +50,9 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
}
public void await() throws InterruptedException {
RFuture<RedissonCountDownLatchEntry> promise = subscribe();
RFuture<RedissonCountDownLatchEntry> future = subscribe();
try {
get(promise);
commandExecutor.syncSubscription(future);
while (getCount() > 0) {
// waiting for open state
@ -62,7 +62,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
}
}
} finally {
unsubscribe(promise);
unsubscribe(future);
}
}

@ -118,7 +118,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
long threadId = Thread.currentThread().getId();
RFuture<RedissonLockEntry> future = subscribe(threadId);
get(future);
commandExecutor.syncSubscription(future);
try {
while (true) {

@ -64,7 +64,7 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
private int addListener(RedisPubSubListener<?> pubSubListener) {
RFuture<PubSubConnectionEntry> future = commandExecutor.getConnectionManager().psubscribe(name, codec, pubSubListener);
future.syncUninterruptibly();
commandExecutor.syncSubscription(future);
return System.identityHashCode(pubSubListener);
}

@ -91,7 +91,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
}
RFuture<RedissonLockEntry> future = subscribe();
get(future);
commandExecutor.syncSubscription(future);
try {
while (true) {
final Long nearestTimeout;

@ -79,7 +79,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
}
RFuture<RedissonLockEntry> future = subscribe();
get(future);
commandExecutor.syncSubscription(future);
try {
while (true) {
if (tryAcquire(permits)) {

@ -79,7 +79,7 @@ public class RedissonTopic<M> implements RTopic<M> {
private int addListener(RedisPubSubListener<?> pubSubListener) {
RFuture<PubSubConnectionEntry> future = commandExecutor.getConnectionManager().subscribe(codec, name, pubSubListener);
future.syncUninterruptibly();
commandExecutor.syncSubscription(future);
return System.identityHashCode(pubSubListener);
}

@ -50,6 +50,8 @@ public interface CommandAsyncExecutor {
boolean await(RFuture<?> RFuture, long timeout, TimeUnit timeoutUnit) throws InterruptedException;
void syncSubscription(RFuture<?> future);
<V> V get(RFuture<V> RFuture);
<T, R> RFuture<R> writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params);

@ -48,6 +48,7 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.NodeSource;
import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.connection.NodeSource.Redirect;
import org.redisson.misc.LogHelper;
import org.redisson.misc.RPromise;
@ -70,6 +71,7 @@ import org.redisson.client.protocol.ScoredEntry;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.misc.RedissonObjectFactory;
/**
@ -117,6 +119,20 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return redisson != null || redissonReactive != null;
}
@Override
public void syncSubscription(RFuture<?> future) {
MasterSlaveServersConfig config = connectionManager.getConfig();
try {
int timeout = config.getTimeout() + config.getRetryInterval()*config.getRetryAttempts();
if (!future.await(timeout)) {
throw new RedisTimeoutException("Subscribe timeout: (" + timeout + "ms)");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
future.syncUninterruptibly();
}
@Override
public <V> V get(RFuture<V> future) {
if (!future.isDone()) {

Loading…
Cancel
Save