From 4003686671e11f0e17e8663977bba5fc423e2338 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Sat, 6 Jan 2024 15:04:18 +0300 Subject: [PATCH] refactoring --- .../pubsub/PubSubConnectionEntry.java | 82 ++++++++++++++--- .../pubsub/PublishSubscribeService.java | 89 +++---------------- 2 files changed, 83 insertions(+), 88 deletions(-) diff --git a/redisson/src/main/java/org/redisson/pubsub/PubSubConnectionEntry.java b/redisson/src/main/java/org/redisson/pubsub/PubSubConnectionEntry.java index 06f4f36ee..d99b96107 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PubSubConnectionEntry.java +++ b/redisson/src/main/java/org/redisson/pubsub/PubSubConnectionEntry.java @@ -17,14 +17,16 @@ package org.redisson.pubsub; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; -import org.redisson.misc.WrappedLock; import org.redisson.PubSubMessageListener; import org.redisson.PubSubPatternMessageListener; import org.redisson.client.*; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.pubsub.PubSubStatusMessage; import org.redisson.client.protocol.pubsub.PubSubType; +import org.redisson.connection.ConnectionManager; import org.redisson.connection.ServiceManager; +import org.redisson.misc.AsyncSemaphore; +import org.redisson.misc.WrappedLock; import java.util.EventListener; import java.util.LinkedList; @@ -54,11 +56,13 @@ public class PubSubConnectionEntry { private static final Queue> EMPTY_QUEUE = new LinkedList<>(); private final ServiceManager serviceManager; + private final PublishSubscribeService subscribeService; - public PubSubConnectionEntry(RedisPubSubConnection conn, ServiceManager serviceManager) { + public PubSubConnectionEntry(RedisPubSubConnection conn, ConnectionManager connectionManager) { super(); this.conn = conn; - this.serviceManager = serviceManager; + this.serviceManager = connectionManager.getServiceManager(); + this.subscribeService = connectionManager.getSubscribeService(); this.subscribedChannelsAmount = new AtomicInteger(serviceManager.getConfig().getSubscriptionsPerConnection()); } @@ -161,9 +165,31 @@ public class PubSubConnectionEntry { return subscribedChannelsAmount.get() == serviceManager.getConfig().getSubscriptionsPerConnection(); } - public void subscribe(Codec codec, PubSubType type, ChannelName channelName, CompletableFuture subscribeFuture) { - ChannelFuture future; + public void subscribe(Codec codec, ChannelName channelName, CompletableFuture pm, + PubSubType type, AsyncSemaphore lock, RedisPubSubListener[] listeners) { + CompletableFuture pp = new CompletableFuture<>(); + pp.whenComplete((r, e) -> { + if (e != null) { + PubSubType unsubscribeType = PublishSubscribeService.SUBSCRIBE2UNSUBSCRIBE.get(type); + CompletableFuture f = subscribeService.unsubscribe(channelName, unsubscribeType); + f.whenComplete((rr, ee) -> { + pm.completeExceptionally(e); + }); + return; + } + + pm.complete(r); + }); + + CompletableFuture subscribeFuture = addListeners(channelName, pp, type, lock, listeners); CompletableFuture promise = new CompletableFuture<>(); + promise.whenComplete((r, ex) -> { + if (ex != null) { + subscribeFuture.completeExceptionally(ex); + } + }); + + ChannelFuture future; if (PubSubType.SUBSCRIBE == type) { future = conn.subscribe(promise, codec, channelName); } else if (PubSubType.SSUBSCRIBE == type) { @@ -171,13 +197,6 @@ public class PubSubConnectionEntry { } else { future = conn.psubscribe(promise, codec, channelName); } - - promise.whenComplete((r, ex) -> { - if (ex != null) { - subscribeFuture.completeExceptionally(ex); - } - }); - future.addListener((ChannelFutureListener) future1 -> { if (!future1.isSuccess()) { subscribeFuture.completeExceptionally(future1.cause()); @@ -192,7 +211,7 @@ public class PubSubConnectionEntry { }); } - public SubscribeListener getSubscribeFuture(ChannelName channel, PubSubType type) { + private SubscribeListener getSubscribeFuture(ChannelName channel, PubSubType type) { return subscribeChannelListeners.computeIfAbsent(channel, k -> { SubscribeListener listener = new SubscribeListener(channel, type); conn.addListener(listener); @@ -257,4 +276,41 @@ public class PubSubConnectionEntry { public String toString() { return "PubSubConnectionEntry [subscribedChannelsAmount=" + subscribedChannelsAmount + ", conn=" + conn + "]"; } + + public CompletableFuture addListeners(ChannelName channelName, + CompletableFuture promise, + PubSubType type, AsyncSemaphore lock, + RedisPubSubListener... listeners) { + for (RedisPubSubListener listener : listeners) { + addListener(channelName, listener); + } + SubscribeListener list = getSubscribeFuture(channelName, type); + CompletableFuture subscribeFuture = list.getSuccessFuture(); + + subscribeFuture.whenComplete((res, e) -> { + if (e != null) { + promise.completeExceptionally(e); + lock.release(); + return; + } + + if (!promise.complete(this)) { + for (RedisPubSubListener listener : listeners) { + removeListener(channelName, listener); + } + if (!hasListeners(channelName)) { + subscribeService.unsubscribeLocked(type, channelName) + .whenComplete((r, ex) -> { + lock.release(); + }); + } else { + lock.release(); + } + } else { + lock.release(); + } + }); + return subscribeFuture; + } + } diff --git a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java index 229d29c99..a18162aa0 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java +++ b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java @@ -115,7 +115,13 @@ public class PublishSubscribeService { private boolean shardingSupported = false; - private final Map subscribe2unsubscribe = new HashMap<>(); + public static final Map SUBSCRIBE2UNSUBSCRIBE = new HashMap<>(); + + static { + SUBSCRIBE2UNSUBSCRIBE.put(PubSubType.SUBSCRIBE, PubSubType.UNSUBSCRIBE); + SUBSCRIBE2UNSUBSCRIBE.put(PubSubType.SSUBSCRIBE, PubSubType.SUNSUBSCRIBE); + SUBSCRIBE2UNSUBSCRIBE.put(PubSubType.PSUBSCRIBE, PubSubType.PUNSUBSCRIBE); + } public PublishSubscribeService(ConnectionManager connectionManager) { super(); @@ -124,10 +130,6 @@ public class PublishSubscribeService { for (int i = 0; i < locks.length; i++) { locks[i] = new AsyncSemaphore(1); } - - subscribe2unsubscribe.put(PubSubType.SUBSCRIBE, PubSubType.UNSUBSCRIBE); - subscribe2unsubscribe.put(PubSubType.SSUBSCRIBE, PubSubType.SUNSUBSCRIBE); - subscribe2unsubscribe.put(PubSubType.PSUBSCRIBE, PubSubType.PUNSUBSCRIBE); } public LockPubSub getLockPubSub() { @@ -370,7 +372,7 @@ public class PublishSubscribeService { PubSubType type, AsyncSemaphore lock, AtomicInteger attempts, RedisPubSubListener... listeners) { PubSubConnectionEntry connEntry = name2PubSubConnection.get(new PubSubKey(channelName, entry)); if (connEntry != null) { - addListeners(channelName, promise, type, lock, connEntry, listeners); + connEntry.addListeners(channelName, promise, type, lock, listeners); return; } @@ -401,7 +403,7 @@ public class PublishSubscribeService { freeEntry.release(); freePubSubLock.release(); - addListeners(channelName, promise, type, lock, oldEntry, listeners); + oldEntry.addListeners(channelName, promise, type, lock, listeners); return; } @@ -413,21 +415,7 @@ public class PublishSubscribeService { } freePubSubLock.release(); - CompletableFuture pp = new CompletableFuture<>(); - pp.whenComplete((r, e) -> { - if (e != null) { - PubSubType unsubscribeType = subscribe2unsubscribe.get(type); - CompletableFuture f = unsubscribe(channelName, unsubscribeType); - f.whenComplete((rr, ee) -> { - promise.completeExceptionally(e); - }); - return; - } - - promise.complete(r); - }); - CompletableFuture subscribeFuture = addListeners(channelName, pp, type, lock, freeEntry, listeners); - freeEntry.subscribe(codec, type, channelName, subscribeFuture); + freeEntry.subscribe(codec, channelName, promise, type, lock, listeners); }); } @@ -436,41 +424,6 @@ public class PublishSubscribeService { return connectionManager.getWriteEntry(slot); } - private CompletableFuture addListeners(ChannelName channelName, CompletableFuture promise, - PubSubType type, AsyncSemaphore lock, PubSubConnectionEntry connEntry, - RedisPubSubListener... listeners) { - for (RedisPubSubListener listener : listeners) { - connEntry.addListener(channelName, listener); - } - SubscribeListener list = connEntry.getSubscribeFuture(channelName, type); - CompletableFuture subscribeFuture = list.getSuccessFuture(); - - subscribeFuture.whenComplete((res, e) -> { - if (e != null) { - promise.completeExceptionally(e); - lock.release(); - return; - } - - if (!promise.complete(connEntry)) { - for (RedisPubSubListener listener : listeners) { - connEntry.removeListener(channelName, listener); - } - if (!connEntry.hasListeners(channelName)) { - unsubscribeLocked(type, channelName) - .whenComplete((r, ex) -> { - lock.release(); - }); - } else { - lock.release(); - } - } else { - lock.release(); - } - }); - return subscribeFuture; - } - private void connect(Codec codec, ChannelName channelName, MasterSlaveEntry msEntry, ClientConnectionsEntry clientEntry, CompletableFuture promise, @@ -495,17 +448,17 @@ public class PublishSubscribeService { connFuture.thenAccept(conn -> { freePubSubLock.acquire().thenAccept(c -> { - PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, connectionManager.getServiceManager()); + PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, connectionManager); int remainFreeAmount = entry.tryAcquire(); PubSubKey key = new PubSubKey(channelName, msEntry); PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(key, entry); if (oldEntry != null) { - msEntry.returnPubSubConnection(conn); + msEntry.returnPubSubConnection(entry.getConnection()); freePubSubLock.release(); - addListeners(channelName, promise, type, lock, oldEntry, listeners); + oldEntry.addListeners(channelName, promise, type, lock, listeners); return; } @@ -518,21 +471,7 @@ public class PublishSubscribeService { } freePubSubLock.release(); - CompletableFuture pp = new CompletableFuture<>(); - pp.whenComplete((r, e) -> { - if (e != null) { - PubSubType unsubscribeType = subscribe2unsubscribe.get(type); - CompletableFuture f = unsubscribe(channelName, unsubscribeType); - f.whenComplete((rr, ee) -> { - promise.completeExceptionally(e); - }); - return; - } - - promise.complete(r); - }); - CompletableFuture subscribeFuture = addListeners(channelName, pp, type, lock, entry, listeners); - entry.subscribe(codec, type, channelName, subscribeFuture); + entry.subscribe(codec, channelName, promise, type, lock, listeners); }); }); }