From 76a29978f6c6adaabdc72e0738993c9fe75fe107 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Fri, 17 Feb 2023 16:24:29 +0300 Subject: [PATCH] refactoring --- .../data/connection/RedissonSubscription.java | 4 +- .../data/connection/RedissonSubscription.java | 4 +- .../main/java/org/redisson/RedissonTopic.java | 6 +- .../org/redisson/pubsub/PublishSubscribe.java | 2 +- .../pubsub/PublishSubscribeService.java | 55 +++++++++++++------ 5 files changed, 45 insertions(+), 26 deletions(-) diff --git a/redisson-spring-data/redisson-spring-data-27/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java b/redisson-spring-data/redisson-spring-data-27/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java index 3003b0814..caca30129 100644 --- a/redisson-spring-data/redisson-spring-data-27/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java +++ b/redisson-spring-data/redisson-spring-data-27/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java @@ -53,7 +53,7 @@ public class RedissonSubscription extends AbstractSubscription { List> list = new ArrayList<>(); Queue subscribed = new ConcurrentLinkedQueue<>(); for (byte[] channel : channels) { - if (subscribeService.getPubSubEntry(new ChannelName(channel)) != null) { + if (subscribeService.hasEntry(new ChannelName(channel))) { continue; } @@ -111,7 +111,7 @@ public class RedissonSubscription extends AbstractSubscription { List> list = new ArrayList<>(); Queue subscribed = new ConcurrentLinkedQueue<>(); for (byte[] channel : patterns) { - if (subscribeService.getPubSubEntry(new ChannelName(channel)) != null) { + if (subscribeService.hasEntry(new ChannelName(channel))) { continue; } diff --git a/redisson-spring-data/redisson-spring-data-30/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java b/redisson-spring-data/redisson-spring-data-30/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java index 3003b0814..caca30129 100644 --- a/redisson-spring-data/redisson-spring-data-30/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java +++ b/redisson-spring-data/redisson-spring-data-30/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java @@ -53,7 +53,7 @@ public class RedissonSubscription extends AbstractSubscription { List> list = new ArrayList<>(); Queue subscribed = new ConcurrentLinkedQueue<>(); for (byte[] channel : channels) { - if (subscribeService.getPubSubEntry(new ChannelName(channel)) != null) { + if (subscribeService.hasEntry(new ChannelName(channel))) { continue; } @@ -111,7 +111,7 @@ public class RedissonSubscription extends AbstractSubscription { List> list = new ArrayList<>(); Queue subscribed = new ConcurrentLinkedQueue<>(); for (byte[] channel : patterns) { - if (subscribeService.getPubSubEntry(new ChannelName(channel)) != null) { + if (subscribeService.hasEntry(new ChannelName(channel))) { continue; } diff --git a/redisson/src/main/java/org/redisson/RedissonTopic.java b/redisson/src/main/java/org/redisson/RedissonTopic.java index 5619f3339..e27cbb74f 100644 --- a/redisson/src/main/java/org/redisson/RedissonTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonTopic.java @@ -166,11 +166,7 @@ public class RedissonTopic implements RTopic { @Override public int countListeners() { - PubSubConnectionEntry entry = subscribeService.getPubSubEntry(channelName); - if (entry != null) { - return entry.countListeners(channelName); - } - return 0; + return subscribeService.countListeners(channelName); } @Override diff --git a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribe.java b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribe.java index d9cf0d053..e9ed6f2f6 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribe.java +++ b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribe.java @@ -47,7 +47,7 @@ abstract class PublishSubscribe> { semaphore.acquire().thenAccept(c -> { if (entry.release() == 0) { entries.remove(entryName); - service.unsubscribe(PubSubType.UNSUBSCRIBE, new ChannelName(channelName)) + service.unsubscribeLocked(PubSubType.UNSUBSCRIBE, new ChannelName(channelName)) .whenComplete((r, e) -> { semaphore.release(); }); diff --git a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java index 00221ab35..d4d7fabbe 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java +++ b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java @@ -91,6 +91,7 @@ public class PublishSubscribeService { private final AsyncSemaphore freePubSubLock = new AsyncSemaphore(1); + private final Map name2entry = new ConcurrentHashMap<>(); private final ConcurrentMap name2PubSubConnection = new ConcurrentHashMap<>(); private final ConcurrentMap entry2PubSubConnection = new ConcurrentHashMap<>(); @@ -122,10 +123,22 @@ public class PublishSubscribeService { return semaphorePubSub; } - public PubSubConnectionEntry getPubSubEntry(ChannelName channelName) { + private PubSubConnectionEntry getPubSubEntry(ChannelName channelName) { return name2PubSubConnection.get(createKey(channelName)); } + public int countListeners(ChannelName channelName) { + PubSubConnectionEntry entry = getPubSubEntry(channelName); + if (entry != null) { + return entry.countListeners(channelName); + } + return 0; + } + + public boolean hasEntry(ChannelName channelName) { + return getPubSubEntry(channelName) != null; + } + public CompletableFuture> psubscribe(ChannelName channelName, Codec codec, RedisPubSubListener... listeners) { if (isMultiEntity(channelName)) { Collection entrySet = connectionManager.getEntrySet(); @@ -213,7 +226,8 @@ public class PublishSubscribeService { return; } - subscribe(codec, channelName, entry, promise, type, lock, new AtomicInteger(), listeners); + subscribeNoTimeout(codec, channelName, entry, promise, type, lock, new AtomicInteger(), listeners); + timeout(promise); }); return promise; } @@ -242,13 +256,6 @@ public class PublishSubscribeService { return new PubSubKey(channelName, entry); } - private void subscribe(Codec codec, ChannelName channelName, MasterSlaveEntry entry, - CompletableFuture promise, PubSubType type, - AsyncSemaphore lock, AtomicInteger attempts, RedisPubSubListener... listeners) { - subscribeNoTimeout(codec, channelName, entry, promise, type, lock, attempts, listeners); - timeout(promise); - } - public void timeout(CompletableFuture promise) { int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts(); timeout(promise, timeout); @@ -345,6 +352,8 @@ public class PublishSubscribeService { return; } + name2entry.put(channelName, entry); + if (remainFreeAmount == 0) { freePubSubConnections.getEntries().poll(); } @@ -386,7 +395,7 @@ public class PublishSubscribeService { connEntry.removeListener(channelName, listener); } if (!connEntry.hasListeners(channelName)) { - unsubscribe(type, channelName) + unsubscribeLocked(type, channelName) .whenComplete((r, ex) -> { lock.release(); }); @@ -427,6 +436,8 @@ public class PublishSubscribeService { return; } + name2entry.put(channelName, msEntry); + if (remainFreeAmount > 0) { PubSubEntry psEntry = entry2PubSubConnection.computeIfAbsent(msEntry, e -> new PubSubEntry()); psEntry.getEntries().add(entry); @@ -445,13 +456,24 @@ public class PublishSubscribeService { return connFuture; } - public CompletableFuture unsubscribe(PubSubType topicType, ChannelName channelName) { - PubSubConnectionEntry entry = name2PubSubConnection.remove(createKey(channelName)); + public CompletableFuture unsubscribeLocked(PubSubType topicType, ChannelName channelName) { + MasterSlaveEntry entry = name2entry.get(channelName); + if (entry == null) { + RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings."); + CompletableFuture promise = new CompletableFuture<>(); + promise.completeExceptionally(ex); + return promise; + } + + return unsubscribeLocked(topicType, channelName, entry); + } + + private CompletableFuture unsubscribeLocked(PubSubType topicType, ChannelName channelName, MasterSlaveEntry msEntry) { + PubSubConnectionEntry entry = name2PubSubConnection.remove(new PubSubKey(channelName, msEntry)); if (entry == null || connectionManager.getServiceManager().isShuttingDown()) { return CompletableFuture.completedFuture(null); } - MasterSlaveEntry msEntry = getEntry(channelName); CompletableFuture result = new CompletableFuture<>(); BaseRedisPubSubListener listener = new BaseRedisPubSubListener() { @@ -500,10 +522,11 @@ public class PublishSubscribeService { public void remove(MasterSlaveEntry entry) { entry2PubSubConnection.remove(entry); + name2entry.values().remove(entry); } public CompletableFuture unsubscribe(ChannelName channelName, PubSubType topicType) { - MasterSlaveEntry entry = getEntry(channelName); + MasterSlaveEntry entry = name2entry.get(channelName); if (entry == null) { RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings."); CompletableFuture promise = new CompletableFuture<>(); @@ -740,7 +763,7 @@ public class PublishSubscribeService { CompletableFuture f; if (!entry.hasListeners(channelName)) { - f = unsubscribe(type, channelName) + f = unsubscribeLocked(type, channelName, e) .exceptionally(ex -> null); } else { f = CompletableFuture.completedFuture(null); @@ -770,7 +793,7 @@ public class PublishSubscribeService { } if (entry.hasListeners(channelName)) { - CompletableFuture ff = unsubscribe(type, channelName); + CompletableFuture ff = unsubscribeLocked(type, channelName); return ff.whenComplete((r1, e1) -> { semaphore.release(); });