From 3470e8cc8c2dc3a945e57268cfbbf2e9139c0b1d Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 7 Mar 2024 16:51:49 +0300 Subject: [PATCH] Fixed - StatusListener doesn't work with RShardedTopic --- .../org/redisson/PubSubStatusListener.java | 4 +- .../client/protocol/RedisCommands.java | 1 + .../pubsub/PublishSubscribeService.java | 4 +- .../redisson/RedissonShardedTopicTest.java | 77 ++++++++++++++++--- 4 files changed, 71 insertions(+), 15 deletions(-) diff --git a/redisson/src/main/java/org/redisson/PubSubStatusListener.java b/redisson/src/main/java/org/redisson/PubSubStatusListener.java index 731e8b49a..7fe4b45c6 100644 --- a/redisson/src/main/java/org/redisson/PubSubStatusListener.java +++ b/redisson/src/main/java/org/redisson/PubSubStatusListener.java @@ -72,9 +72,9 @@ public class PubSubStatusListener implements RedisPubSubListener { @Override public void onStatus(PubSubType type, CharSequence channel) { if (channel.toString().equals(name)) { - if (type == PubSubType.SUBSCRIBE) { + if (type == PubSubType.SUBSCRIBE || type == PubSubType.SSUBSCRIBE || type == PubSubType.PSUBSCRIBE) { listener.onSubscribe(channel.toString()); - } else if (type == PubSubType.UNSUBSCRIBE) { + } else if (type == PubSubType.UNSUBSCRIBE || type == PubSubType.SUNSUBSCRIBE || type == PubSubType.PUNSUBSCRIBE) { listener.onUnsubscribe(channel.toString()); } } diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index b09a657a3..31f41373c 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -710,6 +710,7 @@ public interface RedisCommands { RedisStrictCommand SPUBLISH = new RedisStrictCommand("SPUBLISH"); RedisCommand PUBSUB_NUMSUB = new RedisCommand("PUBSUB", "NUMSUB", new ListObjectDecoder(1)); RedisCommand> PUBSUB_CHANNELS = new RedisStrictCommand<>("PUBSUB", "CHANNELS", new StringListReplayDecoder()); + RedisCommand> PUBSUB_SHARDCHANNELS = new RedisStrictCommand<>("PUBSUB", "SHARDCHANNELS", new StringListReplayDecoder()); RedisCommand SSUBSCRIBE = new RedisCommand("SSUBSCRIBE", new PubSubStatusDecoder()); RedisCommand SUBSCRIBE = new RedisCommand("SUBSCRIBE", new PubSubStatusDecoder()); diff --git a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java index 476165b9d..691e8b778 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java +++ b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java @@ -784,7 +784,7 @@ public class PublishSubscribeService { if (scodec != null) { Queue> listeners = pubSubEntry.getListeners(entry.getKey().getChannelName()); unsubscribe(entry.getKey().getChannelName(), pubSubEntry, PubSubType.SUNSUBSCRIBE); - subscribe(codec, entry.getKey().getChannelName(), listeners.toArray(new RedisPubSubListener[0])); + ssubscribe(codec, entry.getKey().getChannelName(), listeners.toArray(new RedisPubSubListener[0])); } Codec patternCodec = pubSubEntry.getConnection().getPatternChannels().get(entry.getKey().getChannelName()); @@ -876,7 +876,7 @@ public class PublishSubscribeService { return; } - log.info("listeners of '{}' channel have been resubscribed to '{}'", channelName, res); + log.info("listeners of '{}' sharded-channel have been resubscribed to '{}'", channelName, res); }); } diff --git a/redisson/src/test/java/org/redisson/RedissonShardedTopicTest.java b/redisson/src/test/java/org/redisson/RedissonShardedTopicTest.java index 493d45125..5e4de6801 100644 --- a/redisson/src/test/java/org/redisson/RedissonShardedTopicTest.java +++ b/redisson/src/test/java/org/redisson/RedissonShardedTopicTest.java @@ -3,21 +3,26 @@ package org.redisson; import org.awaitility.Awaitility; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.redisson.api.NatMapper; import org.redisson.api.RShardedTopic; import org.redisson.api.RTopic; import org.redisson.api.RedissonClient; +import org.redisson.api.listener.MessageListener; +import org.redisson.api.listener.StatusListener; +import org.redisson.api.redisnode.RedisCluster; +import org.redisson.api.redisnode.RedisClusterMaster; +import org.redisson.api.redisnode.RedisNodes; +import org.redisson.client.RedisClient; +import org.redisson.client.RedisClientConfig; +import org.redisson.client.RedisConnection; import org.redisson.client.RedisException; +import org.redisson.client.protocol.RedisCommands; import org.redisson.config.Config; -import org.redisson.connection.balancer.RandomLoadBalancer; -import org.redisson.misc.RedisURI; +import org.redisson.config.SubscriptionMode; import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import java.io.IOException; import java.time.Duration; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; @@ -29,10 +34,7 @@ public class RedissonShardedTopicTest extends RedisDockerTest { GenericContainer redis = createRedis("6.2"); redis.start(); - Config config = new Config(); - config.setProtocol(protocol); - config.useSingleServer() - .setAddress("redis://127.0.0.1:" + redis.getFirstMappedPort()); + Config config = createConfig(redis); RedissonClient redisson = Redisson.create(config); RShardedTopic t = redisson.getShardedTopic("ttt"); @@ -46,6 +48,59 @@ public class RedissonShardedTopicTest extends RedisDockerTest { redis.stop(); } + @Test + public void testReattachInClusterMaster() { + withNewCluster(redissonClient -> { + Config cfg = redissonClient.getConfig(); + cfg.useClusterServers() + .setPingConnectionInterval(0) + .setSubscriptionMode(SubscriptionMode.MASTER); + + RedissonClient redisson = Redisson.create(cfg); + final AtomicBoolean executed = new AtomicBoolean(); + final AtomicInteger subscriptions = new AtomicInteger(); + + RTopic topic = redisson.getShardedTopic("3"); + topic.addListener(new StatusListener() { + + @Override + public void onUnsubscribe(String channel) { + } + + @Override + public void onSubscribe(String channel) { + subscriptions.incrementAndGet(); + } + }); + topic.addListener(Integer.class, new MessageListener() { + @Override + public void onMessage(CharSequence channel, Integer msg) { + executed.set(true); + } + }); + + RedisCluster rnc = redisson.getRedisNodes(RedisNodes.CLUSTER); + for (RedisClusterMaster master : rnc.getMasters()) { + RedisClientConfig cc = new RedisClientConfig(); + cc.setAddress("redis://" + master.getAddr().getHostString() + ":" + master.getAddr().getPort()); + RedisClient c = RedisClient.create(cc); + RedisConnection cn = c.connect(); + List channels = cn.sync(RedisCommands.PUBSUB_SHARDCHANNELS); + if (channels.contains("3")) { + cn.async(RedisCommands.SHUTDOWN); + } + c.shutdown(); + } + + Awaitility.waitAtMost(Duration.ofSeconds(30)).until(() -> subscriptions.get() == 2); + + redisson.getShardedTopic("3").publish(1); + assertThat(executed.get()).isTrue(); + + redisson.shutdown(); + }); + } + @Test public void testClusterSharding() { testInCluster(redisson -> {