From 4011dc7eed5655f38003cd5bcc83b35566967f49 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 15 May 2024 16:29:40 +0300 Subject: [PATCH] Fixed - Keyspace notifications should be listened only on master nodes #5877 --- .../java/org/redisson/RedissonObject.java | 5 +-- .../java/org/redisson/client/ChannelName.java | 10 +++++ .../client/handler/CommandPubSubDecoder.java | 2 +- .../pubsub/PublishSubscribeService.java | 40 ++++++++++--------- .../redisson/RedissonTopicPatternTest.java | 2 +- 5 files changed, 35 insertions(+), 24 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonObject.java b/redisson/src/main/java/org/redisson/RedissonObject.java index c6ccd6291..5b1a9b4ae 100644 --- a/redisson/src/main/java/org/redisson/RedissonObject.java +++ b/redisson/src/main/java/org/redisson/RedissonObject.java @@ -523,12 +523,11 @@ public abstract class RedissonObject implements RObject { protected final RFuture removeTrackingListenerAsync(int listenerId) { PublishSubscribeService subscribeService = commandExecutor.getConnectionManager().getSubscribeService(); - ChannelName cn = new ChannelName("__redis__:invalidate"); - if (!subscribeService.hasEntry(cn)) { + if (!subscribeService.hasEntry(ChannelName.TRACKING)) { return new CompletableFutureWrapper<>((Void) null); } - CompletableFuture f = subscribeService.removeListenerAsync(PubSubType.UNSUBSCRIBE, new ChannelName("__redis__:invalidate"), listenerId); + CompletableFuture f = subscribeService.removeListenerAsync(PubSubType.UNSUBSCRIBE, ChannelName.TRACKING, listenerId); f = f.whenComplete((r, e) -> { if (!commandExecutor.isTrackChanges()) { commandExecutor = commandExecutor.copy(false); diff --git a/redisson/src/main/java/org/redisson/client/ChannelName.java b/redisson/src/main/java/org/redisson/client/ChannelName.java index a5c74b03c..4a1284f36 100644 --- a/redisson/src/main/java/org/redisson/client/ChannelName.java +++ b/redisson/src/main/java/org/redisson/client/ChannelName.java @@ -26,6 +26,8 @@ import io.netty.util.CharsetUtil; */ public class ChannelName implements CharSequence { + public static final ChannelName TRACKING = new ChannelName("__redis__:invalidate"); + private final byte[] name; private final String str; @@ -85,4 +87,12 @@ public class ChannelName implements CharSequence { return toString().subSequence(start, end); } + public boolean isKeyspace() { + return str.startsWith("__keyspace") || str.startsWith("__keyevent"); + } + + public boolean isTracking() { + return str.equals(TRACKING.toString()); + } + } diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java index 5b87ba4b2..c84caa109 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java @@ -243,7 +243,7 @@ public class CommandPubSubDecoder extends CommandDecoder { if ("invalidate".equals(parts.get(0))) { parts.set(0, "message"); - parts.add(1, "__redis__:invalidate".getBytes()); + parts.add(1, ChannelName.TRACKING.getName()); } String command = parts.get(0).toString(); diff --git a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java index 2fd8a1b7e..63bf636ff 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java +++ b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java @@ -203,7 +203,8 @@ public class PublishSubscribeService { List> futures = new ArrayList<>(); for (MasterSlaveEntry entry : entrySet) { - CompletableFuture future = subscribe(PubSubType.PSUBSCRIBE, codec, channelName, entry, null, ls); + CompletableFuture future = + subscribe(PubSubType.PSUBSCRIBE, codec, channelName, entry, entry.getEntry(), ls); futures.add(future); } CompletableFuture future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); @@ -225,9 +226,7 @@ public class PublishSubscribeService { } public boolean isMultiEntity(ChannelName channelName) { - return connectionManager.isClusterMode() - && (channelName.toString().startsWith("__keyspace") - || channelName.toString().startsWith("__keyevent")); + return !connectionManager.getServiceManager().getCfg().isSingleConfig() && channelName.isKeyspace(); } public CompletableFuture subscribe(MasterSlaveEntry entry, ClientConnectionsEntry clientEntry, @@ -238,8 +237,6 @@ public class PublishSubscribeService { private final Map> flushListeners = new ConcurrentHashMap<>(); public CompletableFuture subscribe(CommandAsyncExecutor commandExecutor, FlushListener listener) { - ChannelName channelName = new ChannelName("__redis__:invalidate"); - int listenerId = System.identityHashCode(listener); List> ffs = new ArrayList<>(); @@ -248,7 +245,7 @@ public class PublishSubscribeService { @Override public void onMessage(CharSequence channel, Object msg) { if (msg == null - && channel.equals(channelName.toString())) { + && channel.equals(ChannelName.TRACKING.toString())) { listener.onFlush(entry.getClient().getAddr()); } } @@ -259,7 +256,7 @@ public class PublishSubscribeService { listeners.add(entryListenerId); CompletableFuture future = subscribe(PubSubType.SUBSCRIBE, StringCodec.INSTANCE, - channelName, entry, entry.getEntry(), entryListener); + ChannelName.TRACKING, entry, entry.getEntry(), entryListener); ffs.add(future); } @@ -309,22 +306,20 @@ public class PublishSubscribeService { List> futures = new ArrayList<>(); for (Integer id : ids) { - CompletableFuture f = removeListenerAsync(PubSubType.UNSUBSCRIBE, new ChannelName("__redis__:invalidate"), id); + CompletableFuture f = removeListenerAsync(PubSubType.UNSUBSCRIBE, ChannelName.TRACKING, id); futures.add(f); } return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); } public CompletableFuture subscribe(String key, Codec codec, - CommandAsyncExecutor commandExecutor, TrackingListener listener) { + CommandAsyncExecutor commandExecutor, TrackingListener listener) { MasterSlaveEntry entry = connectionManager.getEntry(key); - ChannelName channelName = new ChannelName("__redis__:invalidate"); - RedisPubSubListener redisPubSubListener = new RedisPubSubListener() { @Override public void onMessage(CharSequence channel, Object msg) { - if (channel.equals(channelName.toString()) + if (channel.equals(ChannelName.TRACKING.toString()) && key.equals(msg)) { listener.onChange((String) msg); } @@ -343,7 +338,8 @@ public class PublishSubscribeService { List> ffs = new ArrayList<>(); for (ClientConnectionsEntry ee : entries) { - CompletableFuture future = subscribe(PubSubType.SUBSCRIBE, codec, channelName, entry, ee, redisPubSubListener); + CompletableFuture future = + subscribe(PubSubType.SUBSCRIBE, codec, ChannelName.TRACKING, entry, ee, redisPubSubListener); ffs.add(future); } @@ -372,7 +368,8 @@ public class PublishSubscribeService { List> futures = new ArrayList<>(); for (MasterSlaveEntry entry : entrySet) { - CompletableFuture future = subscribe(PubSubType.SUBSCRIBE, codec, channelName, entry, null, ls); + CompletableFuture future = + subscribe(PubSubType.SUBSCRIBE, codec, channelName, entry, entry.getEntry(), ls); futures.add(future); } CompletableFuture future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); @@ -404,7 +401,8 @@ public class PublishSubscribeService { } private CompletableFuture subscribe(PubSubType type, Codec codec, ChannelName channelName, - MasterSlaveEntry entry, ClientConnectionsEntry clientEntry, RedisPubSubListener... listeners) { + MasterSlaveEntry entry, ClientConnectionsEntry clientEntry, + RedisPubSubListener... listeners) { CompletableFuture promise = new CompletableFuture<>(); AsyncSemaphore lock = getSemaphore(channelName); int timeout = config.getSubscriptionTimeout(); @@ -510,7 +508,7 @@ public class PublishSubscribeService { connEntry = name2PubSubConnection.get(new PubSubKey(channelName, entry)); } if (connEntry != null) { - if (clientEntry != null) { + if (clientEntry != null && channelName.isTracking()) { clientEntry.getTrackedConnectionsHolder().incUsage(); } connEntry.addListeners(channelName, promise, type, lock, listeners); @@ -548,7 +546,9 @@ public class PublishSubscribeService { if (clientEntry != null) { PubSubClientKey key = new PubSubClientKey(channelName, clientEntry); oldEntry = key2connection.putIfAbsent(key, freeEntry); - clientEntry.getTrackedConnectionsHolder().incUsage(); + if (channelName.isTracking()) { + clientEntry.getTrackedConnectionsHolder().incUsage(); + } } PubSubKey key = new PubSubKey(channelName, entry); @@ -613,7 +613,9 @@ public class PublishSubscribeService { if (clientEntry != null) { PubSubClientKey key = new PubSubClientKey(channelName, clientEntry); oldEntry = key2connection.putIfAbsent(key, entry); - clientEntry.getTrackedConnectionsHolder().incUsage(); + if (channelName.isTracking()) { + clientEntry.getTrackedConnectionsHolder().incUsage(); + } } PubSubKey key = new PubSubKey(channelName, msEntry); PubSubConnectionEntry oe = name2PubSubConnection.putIfAbsent(key, entry); diff --git a/redisson/src/test/java/org/redisson/RedissonTopicPatternTest.java b/redisson/src/test/java/org/redisson/RedissonTopicPatternTest.java index de86a9bc1..d0847648c 100644 --- a/redisson/src/test/java/org/redisson/RedissonTopicPatternTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTopicPatternTest.java @@ -349,7 +349,7 @@ public class RedissonTopicPatternTest extends RedisDockerTest { t.removeAllListeners(); } -// @Test + @Test public void testReattachInClusterSlave() { testReattachInCluster(SubscriptionMode.SLAVE); }