refactoring

pull/3438/head
Nikita Koksharov 4 years ago
parent de64949aff
commit ba7d315370

@ -113,33 +113,14 @@ public class RedissonPatternTopic implements RPatternTopic {
}
}
@Override
public RFuture<Void> removeListenerAsync(int listenerId) {
RPromise<Void> result = new RedissonPromise<>();
AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName);
semaphore.acquire(() -> {
PubSubConnectionEntry entry = subscribeService.getPubSubEntry(channelName);
if (entry == null) {
semaphore.release();
result.trySuccess(null);
return;
}
entry.removeListener(channelName, listenerId);
if (!entry.hasListeners(channelName)) {
subscribeService.unsubscribe(PubSubType.PUNSUBSCRIBE, channelName, semaphore)
.onComplete(new TransferListener<>(result));
} else {
semaphore.release();
result.trySuccess(null);
}
});
return result;
return subscribeService.removeListenerAsync(PubSubType.PUNSUBSCRIBE, channelName, listenerId);
}
@Override
public void removeListener(int listenerId) {
removeListenerAsync(listenerId).syncUninterruptibly();
commandExecutor.syncSubscription(removeListenerAsync(listenerId));
}
@Override
@ -162,22 +143,8 @@ public class RedissonPatternTopic implements RPatternTopic {
@Override
public void removeListener(PatternMessageListener<?> listener) {
AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName);
acquire(semaphore);
PubSubConnectionEntry entry = subscribeService.getPubSubEntry(channelName);
if (entry == null) {
semaphore.release();
return;
}
entry.removeListener(channelName, listener);
if (!entry.hasListeners(channelName)) {
subscribeService.unsubscribe(PubSubType.PUNSUBSCRIBE, channelName, semaphore).syncUninterruptibly();
} else {
semaphore.release();
}
RFuture<Void> future = subscribeService.removeListenerAsync(PubSubType.PUNSUBSCRIBE, channelName, listener);
commandExecutor.syncSubscription(future);
}
@Override

@ -163,93 +163,22 @@ public class RedissonTopic implements RTopic {
@Override
public void removeListener(MessageListener<?> listener) {
RFuture<Void> future = removeListenerAsync(listener);
MasterSlaveServersConfig config = commandExecutor.getConnectionManager().getConfig();
int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts();
if (!future.awaitUninterruptibly(timeout)) {
throw new RedisTimeoutException("Remove listeners operation timeout: (" + timeout + "ms) for " + name + " topic");
}
commandExecutor.syncSubscription(future);
}
@Override
public RFuture<Void> removeListenerAsync(MessageListener<?> listener) {
return removeListenerAsync(channelName, listener);
}
protected RFuture<Void> removeListenerAsync(ChannelName channelName, MessageListener<?> listener) {
RPromise<Void> promise = new RedissonPromise<Void>();
AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName);
semaphore.acquire(new Runnable() {
@Override
public void run() {
PubSubConnectionEntry entry = subscribeService.getPubSubEntry(channelName);
if (entry == null) {
semaphore.release();
promise.trySuccess(null);
return;
}
entry.removeListener(channelName, listener);
if (!entry.hasListeners(channelName)) {
subscribeService.unsubscribe(PubSubType.UNSUBSCRIBE, channelName, semaphore)
.onComplete(new TransferListener<Void>(promise));
} else {
semaphore.release();
promise.trySuccess(null);
}
}
});
return promise;
return subscribeService.removeListenerAsync(PubSubType.UNSUBSCRIBE, channelName, listener);
}
@Override
public RFuture<Void> removeListenerAsync(Integer... listenerIds) {
RPromise<Void> promise = new RedissonPromise<Void>();
AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName);
semaphore.acquire(new Runnable() {
@Override
public void run() {
PubSubConnectionEntry entry = subscribeService.getPubSubEntry(channelName);
if (entry == null) {
semaphore.release();
promise.trySuccess(null);
return;
}
for (int id : listenerIds) {
entry.removeListener(channelName, id);
}
if (!entry.hasListeners(channelName)) {
subscribeService.unsubscribe(PubSubType.UNSUBSCRIBE, channelName, semaphore)
.onComplete(new TransferListener<Void>(promise));
} else {
semaphore.release();
promise.trySuccess(null);
}
}
});
return promise;
return subscribeService.removeListenerAsync(PubSubType.UNSUBSCRIBE, channelName, listenerIds);
}
@Override
public void removeListener(Integer... listenerIds) {
AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName);
acquire(semaphore);
PubSubConnectionEntry entry = subscribeService.getPubSubEntry(channelName);
if (entry == null) {
semaphore.release();
return;
}
for (int id : listenerIds) {
entry.removeListener(channelName, id);
}
if (!entry.hasListeners(channelName)) {
subscribeService.unsubscribe(PubSubType.UNSUBSCRIBE, channelName, semaphore).syncUninterruptibly();
} else {
semaphore.release();
}
commandExecutor.syncSubscription(removeListenerAsync(listenerIds));
}
@Override

@ -16,6 +16,7 @@
package org.redisson.pubsub;
import java.util.Collection;
import java.util.EventListener;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
@ -25,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.api.RFuture;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.RedisNodeNotFoundException;
@ -39,6 +41,7 @@ import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.TransferListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -575,6 +578,54 @@ public class PublishSubscribeService {
});
}
public RFuture<Void> removeListenerAsync(PubSubType type, ChannelName channelName, EventListener listener) {
RPromise<Void> promise = new RedissonPromise<>();
AsyncSemaphore semaphore = getSemaphore(channelName);
semaphore.acquire(() -> {
PubSubConnectionEntry entry = getPubSubEntry(channelName);
if (entry == null) {
semaphore.release();
promise.trySuccess(null);
return;
}
entry.removeListener(channelName, listener);
if (!entry.hasListeners(channelName)) {
unsubscribe(type, channelName, semaphore)
.onComplete(new TransferListener<>(promise));
} else {
semaphore.release();
promise.trySuccess(null);
}
});
return promise;
}
public RFuture<Void> removeListenerAsync(PubSubType type, ChannelName channelName, Integer... listenerIds) {
RPromise<Void> promise = new RedissonPromise<>();
AsyncSemaphore semaphore = getSemaphore(channelName);
semaphore.acquire(() -> {
PubSubConnectionEntry entry = getPubSubEntry(channelName);
if (entry == null) {
semaphore.release();
promise.trySuccess(null);
return;
}
for (int id : listenerIds) {
entry.removeListener(channelName, id);
}
if (!entry.hasListeners(channelName)) {
unsubscribe(type, channelName, semaphore)
.onComplete(new TransferListener<>(promise));
} else {
semaphore.release();
promise.trySuccess(null);
}
});
return promise;
}
@Override
public String toString() {
return "PublishSubscribeService [name2PubSubConnection=" + name2PubSubConnection + ", entry2PubSubConnection=" + entry2PubSubConnection + "]";

Loading…
Cancel
Save