Fixed - Keyspace notifications should be listened only on master nodes #5877

pull/5884/head
Nikita Koksharov 9 months ago
parent fc80806ad0
commit 4011dc7eed

@ -523,12 +523,11 @@ public abstract class RedissonObject implements RObject {
protected final RFuture<Void> removeTrackingListenerAsync(int listenerId) {
PublishSubscribeService subscribeService = commandExecutor.getConnectionManager().getSubscribeService();
ChannelName cn = new ChannelName("__redis__:invalidate");
if (!subscribeService.hasEntry(cn)) {
if (!subscribeService.hasEntry(ChannelName.TRACKING)) {
return new CompletableFutureWrapper<>((Void) null);
}
CompletableFuture<Void> f = subscribeService.removeListenerAsync(PubSubType.UNSUBSCRIBE, new ChannelName("__redis__:invalidate"), listenerId);
CompletableFuture<Void> f = subscribeService.removeListenerAsync(PubSubType.UNSUBSCRIBE, ChannelName.TRACKING, listenerId);
f = f.whenComplete((r, e) -> {
if (!commandExecutor.isTrackChanges()) {
commandExecutor = commandExecutor.copy(false);

@ -26,6 +26,8 @@ import io.netty.util.CharsetUtil;
*/
public class ChannelName implements CharSequence {
public static final ChannelName TRACKING = new ChannelName("__redis__:invalidate");
private final byte[] name;
private final String str;
@ -85,4 +87,12 @@ public class ChannelName implements CharSequence {
return toString().subSequence(start, end);
}
public boolean isKeyspace() {
return str.startsWith("__keyspace") || str.startsWith("__keyevent");
}
public boolean isTracking() {
return str.equals(TRACKING.toString());
}
}

@ -243,7 +243,7 @@ public class CommandPubSubDecoder extends CommandDecoder {
if ("invalidate".equals(parts.get(0))) {
parts.set(0, "message");
parts.add(1, "__redis__:invalidate".getBytes());
parts.add(1, ChannelName.TRACKING.getName());
}
String command = parts.get(0).toString();

@ -203,7 +203,8 @@ public class PublishSubscribeService {
List<CompletableFuture<PubSubConnectionEntry>> futures = new ArrayList<>();
for (MasterSlaveEntry entry : entrySet) {
CompletableFuture<PubSubConnectionEntry> future = subscribe(PubSubType.PSUBSCRIBE, codec, channelName, entry, null, ls);
CompletableFuture<PubSubConnectionEntry> future =
subscribe(PubSubType.PSUBSCRIBE, codec, channelName, entry, entry.getEntry(), ls);
futures.add(future);
}
CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
@ -225,9 +226,7 @@ public class PublishSubscribeService {
}
public boolean isMultiEntity(ChannelName channelName) {
return connectionManager.isClusterMode()
&& (channelName.toString().startsWith("__keyspace")
|| channelName.toString().startsWith("__keyevent"));
return !connectionManager.getServiceManager().getCfg().isSingleConfig() && channelName.isKeyspace();
}
public CompletableFuture<PubSubConnectionEntry> subscribe(MasterSlaveEntry entry, ClientConnectionsEntry clientEntry,
@ -238,8 +237,6 @@ public class PublishSubscribeService {
private final Map<Integer, Collection<Integer>> flushListeners = new ConcurrentHashMap<>();
public CompletableFuture<Integer> subscribe(CommandAsyncExecutor commandExecutor, FlushListener listener) {
ChannelName channelName = new ChannelName("__redis__:invalidate");
int listenerId = System.identityHashCode(listener);
List<CompletableFuture<PubSubConnectionEntry>> ffs = new ArrayList<>();
@ -248,7 +245,7 @@ public class PublishSubscribeService {
@Override
public void onMessage(CharSequence channel, Object msg) {
if (msg == null
&& channel.equals(channelName.toString())) {
&& channel.equals(ChannelName.TRACKING.toString())) {
listener.onFlush(entry.getClient().getAddr());
}
}
@ -259,7 +256,7 @@ public class PublishSubscribeService {
listeners.add(entryListenerId);
CompletableFuture<PubSubConnectionEntry> future = subscribe(PubSubType.SUBSCRIBE, StringCodec.INSTANCE,
channelName, entry, entry.getEntry(), entryListener);
ChannelName.TRACKING, entry, entry.getEntry(), entryListener);
ffs.add(future);
}
@ -309,22 +306,20 @@ public class PublishSubscribeService {
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (Integer id : ids) {
CompletableFuture<Void> f = removeListenerAsync(PubSubType.UNSUBSCRIBE, new ChannelName("__redis__:invalidate"), id);
CompletableFuture<Void> f = removeListenerAsync(PubSubType.UNSUBSCRIBE, ChannelName.TRACKING, id);
futures.add(f);
}
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}
public CompletableFuture<Integer> subscribe(String key, Codec codec,
CommandAsyncExecutor commandExecutor, TrackingListener listener) {
CommandAsyncExecutor commandExecutor, TrackingListener listener) {
MasterSlaveEntry entry = connectionManager.getEntry(key);
ChannelName channelName = new ChannelName("__redis__:invalidate");
RedisPubSubListener<Object> redisPubSubListener = new RedisPubSubListener<Object>() {
@Override
public void onMessage(CharSequence channel, Object msg) {
if (channel.equals(channelName.toString())
if (channel.equals(ChannelName.TRACKING.toString())
&& key.equals(msg)) {
listener.onChange((String) msg);
}
@ -343,7 +338,8 @@ public class PublishSubscribeService {
List<CompletableFuture<PubSubConnectionEntry>> ffs = new ArrayList<>();
for (ClientConnectionsEntry ee : entries) {
CompletableFuture<PubSubConnectionEntry> future = subscribe(PubSubType.SUBSCRIBE, codec, channelName, entry, ee, redisPubSubListener);
CompletableFuture<PubSubConnectionEntry> future =
subscribe(PubSubType.SUBSCRIBE, codec, ChannelName.TRACKING, entry, ee, redisPubSubListener);
ffs.add(future);
}
@ -372,7 +368,8 @@ public class PublishSubscribeService {
List<CompletableFuture<PubSubConnectionEntry>> futures = new ArrayList<>();
for (MasterSlaveEntry entry : entrySet) {
CompletableFuture<PubSubConnectionEntry> future = subscribe(PubSubType.SUBSCRIBE, codec, channelName, entry, null, ls);
CompletableFuture<PubSubConnectionEntry> future =
subscribe(PubSubType.SUBSCRIBE, codec, channelName, entry, entry.getEntry(), ls);
futures.add(future);
}
CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
@ -404,7 +401,8 @@ public class PublishSubscribeService {
}
private CompletableFuture<PubSubConnectionEntry> subscribe(PubSubType type, Codec codec, ChannelName channelName,
MasterSlaveEntry entry, ClientConnectionsEntry clientEntry, RedisPubSubListener<?>... listeners) {
MasterSlaveEntry entry, ClientConnectionsEntry clientEntry,
RedisPubSubListener<?>... listeners) {
CompletableFuture<PubSubConnectionEntry> promise = new CompletableFuture<>();
AsyncSemaphore lock = getSemaphore(channelName);
int timeout = config.getSubscriptionTimeout();
@ -510,7 +508,7 @@ public class PublishSubscribeService {
connEntry = name2PubSubConnection.get(new PubSubKey(channelName, entry));
}
if (connEntry != null) {
if (clientEntry != null) {
if (clientEntry != null && channelName.isTracking()) {
clientEntry.getTrackedConnectionsHolder().incUsage();
}
connEntry.addListeners(channelName, promise, type, lock, listeners);
@ -548,7 +546,9 @@ public class PublishSubscribeService {
if (clientEntry != null) {
PubSubClientKey key = new PubSubClientKey(channelName, clientEntry);
oldEntry = key2connection.putIfAbsent(key, freeEntry);
clientEntry.getTrackedConnectionsHolder().incUsage();
if (channelName.isTracking()) {
clientEntry.getTrackedConnectionsHolder().incUsage();
}
}
PubSubKey key = new PubSubKey(channelName, entry);
@ -613,7 +613,9 @@ public class PublishSubscribeService {
if (clientEntry != null) {
PubSubClientKey key = new PubSubClientKey(channelName, clientEntry);
oldEntry = key2connection.putIfAbsent(key, entry);
clientEntry.getTrackedConnectionsHolder().incUsage();
if (channelName.isTracking()) {
clientEntry.getTrackedConnectionsHolder().incUsage();
}
}
PubSubKey key = new PubSubKey(channelName, msEntry);
PubSubConnectionEntry oe = name2PubSubConnection.putIfAbsent(key, entry);

@ -349,7 +349,7 @@ public class RedissonTopicPatternTest extends RedisDockerTest {
t.removeAllListeners();
}
// @Test
@Test
public void testReattachInClusterSlave() {
testReattachInCluster(SubscriptionMode.SLAVE);
}

Loading…
Cancel
Save