From de64949aff8a549e65cd22b72c7eda55bcc2f761 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Tue, 16 Feb 2021 11:17:10 +0300 Subject: [PATCH] refactoring --- .../org/redisson/RedissonPatternTopic.java | 25 ++----- .../main/java/org/redisson/RedissonTopic.java | 9 +-- .../org/redisson/pubsub/PublishSubscribe.java | 2 +- .../pubsub/PublishSubscribeService.java | 69 ++++++------------- 4 files changed, 35 insertions(+), 70 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonPatternTopic.java b/redisson/src/main/java/org/redisson/RedissonPatternTopic.java index 74bbb6874..8e340919b 100644 --- a/redisson/src/main/java/org/redisson/RedissonPatternTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonPatternTopic.java @@ -26,10 +26,12 @@ import org.redisson.client.ChannelName; import org.redisson.client.RedisPubSubListener; import org.redisson.client.RedisTimeoutException; import org.redisson.client.codec.Codec; +import org.redisson.client.protocol.pubsub.PubSubType; import org.redisson.command.CommandAsyncExecutor; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; +import org.redisson.misc.TransferListener; import org.redisson.pubsub.AsyncSemaphore; import org.redisson.pubsub.PubSubConnectionEntry; import org.redisson.pubsub.PublishSubscribeService; @@ -125,7 +127,8 @@ public class RedissonPatternTopic implements RPatternTopic { entry.removeListener(channelName, listenerId); if (!entry.hasListeners(channelName)) { - subscribeService.punsubscribe(channelName, semaphore); + subscribeService.unsubscribe(PubSubType.PUNSUBSCRIBE, channelName, semaphore) + .onComplete(new TransferListener<>(result)); } else { semaphore.release(); result.trySuccess(null); @@ -136,21 +139,7 @@ public class RedissonPatternTopic implements RPatternTopic { @Override public void removeListener(int listenerId) { - AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName); - acquire(semaphore); - - PubSubConnectionEntry entry = subscribeService.getPubSubEntry(channelName); - if (entry == null) { - semaphore.release(); - return; - } - - entry.removeListener(channelName, listenerId); - if (!entry.hasListeners(channelName)) { - subscribeService.punsubscribe(channelName, semaphore); - } else { - semaphore.release(); - } + removeListenerAsync(listenerId).syncUninterruptibly(); } @Override @@ -165,7 +154,7 @@ public class RedissonPatternTopic implements RPatternTopic { } if (entry.hasListeners(channelName)) { - subscribeService.punsubscribe(channelName, semaphore); + subscribeService.unsubscribe(PubSubType.PUNSUBSCRIBE, channelName, semaphore).syncUninterruptibly(); } else { semaphore.release(); } @@ -184,7 +173,7 @@ public class RedissonPatternTopic implements RPatternTopic { entry.removeListener(channelName, listener); if (!entry.hasListeners(channelName)) { - subscribeService.punsubscribe(channelName, semaphore); + subscribeService.unsubscribe(PubSubType.PUNSUBSCRIBE, channelName, semaphore).syncUninterruptibly(); } else { semaphore.release(); } diff --git a/redisson/src/main/java/org/redisson/RedissonTopic.java b/redisson/src/main/java/org/redisson/RedissonTopic.java index 0c73b16e6..61f6c3645 100644 --- a/redisson/src/main/java/org/redisson/RedissonTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonTopic.java @@ -26,6 +26,7 @@ import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; +import org.redisson.client.protocol.pubsub.PubSubType; import org.redisson.command.CommandAsyncExecutor; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.misc.RPromise; @@ -145,7 +146,7 @@ public class RedissonTopic implements RTopic { } if (entry.hasListeners(channelName)) { - subscribeService.unsubscribe(channelName, semaphore).syncUninterruptibly(); + subscribeService.unsubscribe(PubSubType.UNSUBSCRIBE, channelName, semaphore).syncUninterruptibly(); } else { semaphore.release(); } @@ -189,7 +190,7 @@ public class RedissonTopic implements RTopic { entry.removeListener(channelName, listener); if (!entry.hasListeners(channelName)) { - subscribeService.unsubscribe(channelName, semaphore) + subscribeService.unsubscribe(PubSubType.UNSUBSCRIBE, channelName, semaphore) .onComplete(new TransferListener(promise)); } else { semaphore.release(); @@ -219,7 +220,7 @@ public class RedissonTopic implements RTopic { entry.removeListener(channelName, id); } if (!entry.hasListeners(channelName)) { - subscribeService.unsubscribe(channelName, semaphore) + subscribeService.unsubscribe(PubSubType.UNSUBSCRIBE, channelName, semaphore) .onComplete(new TransferListener(promise)); } else { semaphore.release(); @@ -245,7 +246,7 @@ public class RedissonTopic implements RTopic { entry.removeListener(channelName, id); } if (!entry.hasListeners(channelName)) { - subscribeService.unsubscribe(channelName, semaphore).syncUninterruptibly(); + subscribeService.unsubscribe(PubSubType.UNSUBSCRIBE, channelName, semaphore).syncUninterruptibly(); } else { semaphore.release(); } diff --git a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribe.java b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribe.java index 02a32ef45..04a817214 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribe.java +++ b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribe.java @@ -53,7 +53,7 @@ abstract class PublishSubscribe> { if (!removed) { throw new IllegalStateException(); } - service.unsubscribe(new ChannelName(channelName), semaphore); + service.unsubscribe(PubSubType.UNSUBSCRIBE, new ChannelName(channelName), semaphore); } else { semaphore.release(); } diff --git a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java index dbe39ca9c..a924b9f6f 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java +++ b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java @@ -237,7 +237,7 @@ public class PublishSubscribeService { connEntry.removeListener(channelName, listener); } if (!connEntry.hasListeners(channelName)) { - unsubscribe(channelName, lock); + unsubscribe(type, channelName, lock); } else { lock.release(); } @@ -332,7 +332,7 @@ public class PublishSubscribeService { }); } - public RFuture unsubscribe(ChannelName channelName, AsyncSemaphore lock) { + public RFuture unsubscribe(PubSubType topicType, ChannelName channelName, AsyncSemaphore lock) { PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName); if (entry == null || connectionManager.isShuttingDown()) { lock.release(); @@ -340,12 +340,12 @@ public class PublishSubscribeService { } AtomicBoolean executed = new AtomicBoolean(); - RedissonPromise result = new RedissonPromise(); - ChannelFuture future = entry.unsubscribe(channelName, new BaseRedisPubSubListener() { + RedissonPromise result = new RedissonPromise<>(); + BaseRedisPubSubListener listener = new BaseRedisPubSubListener() { @Override public boolean onStatus(PubSubType type, CharSequence channel) { - if (type == PubSubType.UNSUBSCRIBE && channel.equals(channelName)) { + if (type == topicType && channel.equals(channelName)) { executed.set(true); if (entry.release() == 1) { @@ -359,25 +359,26 @@ public class PublishSubscribeService { return false; } - }); + }; - future.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - return; - } + ChannelFuture future; + if (topicType == PubSubType.UNSUBSCRIBE) { + future = entry.unsubscribe(channelName, listener); + } else { + future = entry.punsubscribe(channelName, listener); + } - connectionManager.newTimeout(new TimerTask() { - @Override - public void run(Timeout timeout) throws Exception { - if (executed.get()) { - return; - } - entry.getConnection().onMessage(new PubSubStatusMessage(PubSubType.UNSUBSCRIBE, channelName)); - } - }, config.getTimeout(), TimeUnit.MILLISECONDS); + future.addListener((ChannelFutureListener) f -> { + if (!f.isSuccess()) { + return; } + + connectionManager.newTimeout(timeout -> { + if (executed.get()) { + return; + } + entry.getConnection().onMessage(new PubSubStatusMessage(topicType, channelName)); + }, config.getTimeout(), TimeUnit.MILLISECONDS); }); return result; @@ -468,32 +469,6 @@ public class PublishSubscribeService { return result; } - public void punsubscribe(ChannelName channelName, AsyncSemaphore lock) { - PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName); - if (entry == null || connectionManager.isShuttingDown()) { - lock.release(); - return; - } - - entry.punsubscribe(channelName, new BaseRedisPubSubListener() { - - @Override - public boolean onStatus(PubSubType type, CharSequence channel) { - if (type == PubSubType.PUNSUBSCRIBE && channel.equals(channelName)) { - - if (entry.release() == 1) { - addFreeConnectionEntry(channelName, entry); - } - - lock.release(); - return true; - } - return false; - } - - }); - } - private void addFreeConnectionEntry(ChannelName channelName, PubSubConnectionEntry entry) { int slot = connectionManager.calcSlot(channelName.getName()); MasterSlaveEntry me = connectionManager.getEntry(slot);