Fixed - org.redisson.RedissonTopic.removeAllListeners got blocked on invocation. #1268

pull/1300/head
Nikita 7 years ago
parent ed247c0ce7
commit 707b419d97

@ -23,8 +23,10 @@ import org.redisson.api.RPatternTopic;
import org.redisson.api.listener.PatternMessageListener;
import org.redisson.api.listener.PatternStatusListener;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandExecutor;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.pubsub.AsyncSemaphore;
@ -68,10 +70,18 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
return System.identityHashCode(pubSubListener);
}
protected void acquire(AsyncSemaphore semaphore) {
MasterSlaveServersConfig config = commandExecutor.getConnectionManager().getConfig();
int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts();
if (!semaphore.tryAcquire(timeout)) {
throw new RedisTimeoutException("Remove listeners operation timeout: (" + timeout + "ms) for " + name + " topic");
}
}
@Override
public void removeListener(int listenerId) {
AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
semaphore.acquireUninterruptibly();
acquire(semaphore);
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
if (entry == null) {
@ -90,7 +100,7 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
@Override
public void removeAllListeners() {
AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
semaphore.acquireUninterruptibly();
acquire(semaphore);
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
if (entry == null) {
@ -109,7 +119,7 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
@Override
public void removeListener(PatternMessageListener<M> listener) {
AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
semaphore.acquireUninterruptibly();
acquire(semaphore);
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
if (entry == null) {

@ -24,9 +24,11 @@ import org.redisson.api.RTopic;
import org.redisson.api.listener.MessageListener;
import org.redisson.api.listener.StatusListener;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonObjectFactory;
@ -126,7 +128,7 @@ public class RedissonTopic<M> implements RTopic<M> {
@Override
public void removeAllListeners() {
AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
semaphore.acquireUninterruptibly();
acquire(semaphore);
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
if (entry == null) {
@ -141,11 +143,19 @@ public class RedissonTopic<M> implements RTopic<M> {
semaphore.release();
}
}
protected void acquire(AsyncSemaphore semaphore) {
MasterSlaveServersConfig config = commandExecutor.getConnectionManager().getConfig();
int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts();
if (!semaphore.tryAcquire(timeout)) {
throw new RedisTimeoutException("Remove listeners operation timeout: (" + timeout + "ms) for " + name + " topic");
}
}
@Override
public void removeListener(MessageListener<?> listener) {
AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
semaphore.acquireUninterruptibly();
acquire(semaphore);
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
if (entry == null) {
@ -165,7 +175,7 @@ public class RedissonTopic<M> implements RTopic<M> {
@Override
public void removeListener(int listenerId) {
AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
semaphore.acquireUninterruptibly();
acquire(semaphore);
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
if (entry == null) {

@ -19,6 +19,7 @@ import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
*
@ -34,19 +35,30 @@ public class AsyncSemaphore {
counter = permits;
}
public void acquireUninterruptibly() {
public boolean tryAcquire(long timeoutMillis) {
final CountDownLatch latch = new CountDownLatch(1);
acquire(new Runnable() {
final Runnable listener = new Runnable() {
@Override
public void run() {
latch.countDown();
}
});
};
acquire(listener);
try {
latch.await();
boolean res = latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
if (!res) {
if (!remove(listener)) {
release();
}
}
return res;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (!remove(listener)) {
release();
}
return false;
}
}

@ -26,8 +26,10 @@ import org.redisson.api.RPatternTopicReactive;
import org.redisson.api.listener.PatternMessageListener;
import org.redisson.api.listener.PatternStatusListener;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandReactiveExecutor;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
@ -99,11 +101,19 @@ public class RedissonPatternTopicReactive<M> implements RPatternTopicReactive<M>
}
});
}
protected void acquire(AsyncSemaphore semaphore) {
MasterSlaveServersConfig config = commandExecutor.getConnectionManager().getConfig();
int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts();
if (!semaphore.tryAcquire(timeout)) {
throw new RedisTimeoutException("Remove listeners operation timeout: (" + timeout + "ms) for " + name + " topic");
}
}
@Override
public void removeListener(int listenerId) {
AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
semaphore.acquireUninterruptibly();
acquire(semaphore);
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
if (entry == null) {

Loading…
Cancel
Save