Fixed - StatusListener doesn't work with RShardedTopic

pull/5676/head
Nikita Koksharov 1 year ago
parent 09d81ba805
commit 3470e8cc8c

@ -72,9 +72,9 @@ public class PubSubStatusListener implements RedisPubSubListener<Object> {
@Override @Override
public void onStatus(PubSubType type, CharSequence channel) { public void onStatus(PubSubType type, CharSequence channel) {
if (channel.toString().equals(name)) { if (channel.toString().equals(name)) {
if (type == PubSubType.SUBSCRIBE) { if (type == PubSubType.SUBSCRIBE || type == PubSubType.SSUBSCRIBE || type == PubSubType.PSUBSCRIBE) {
listener.onSubscribe(channel.toString()); listener.onSubscribe(channel.toString());
} else if (type == PubSubType.UNSUBSCRIBE) { } else if (type == PubSubType.UNSUBSCRIBE || type == PubSubType.SUNSUBSCRIBE || type == PubSubType.PUNSUBSCRIBE) {
listener.onUnsubscribe(channel.toString()); listener.onUnsubscribe(channel.toString());
} }
} }

@ -710,6 +710,7 @@ public interface RedisCommands {
RedisStrictCommand<Long> SPUBLISH = new RedisStrictCommand<Long>("SPUBLISH"); RedisStrictCommand<Long> SPUBLISH = new RedisStrictCommand<Long>("SPUBLISH");
RedisCommand<Long> PUBSUB_NUMSUB = new RedisCommand<Long>("PUBSUB", "NUMSUB", new ListObjectDecoder<Long>(1)); RedisCommand<Long> PUBSUB_NUMSUB = new RedisCommand<Long>("PUBSUB", "NUMSUB", new ListObjectDecoder<Long>(1));
RedisCommand<List<String>> PUBSUB_CHANNELS = new RedisStrictCommand<>("PUBSUB", "CHANNELS", new StringListReplayDecoder()); RedisCommand<List<String>> PUBSUB_CHANNELS = new RedisStrictCommand<>("PUBSUB", "CHANNELS", new StringListReplayDecoder());
RedisCommand<List<String>> PUBSUB_SHARDCHANNELS = new RedisStrictCommand<>("PUBSUB", "SHARDCHANNELS", new StringListReplayDecoder());
RedisCommand<Object> SSUBSCRIBE = new RedisCommand<Object>("SSUBSCRIBE", new PubSubStatusDecoder()); RedisCommand<Object> SSUBSCRIBE = new RedisCommand<Object>("SSUBSCRIBE", new PubSubStatusDecoder());
RedisCommand<Object> SUBSCRIBE = new RedisCommand<Object>("SUBSCRIBE", new PubSubStatusDecoder()); RedisCommand<Object> SUBSCRIBE = new RedisCommand<Object>("SUBSCRIBE", new PubSubStatusDecoder());

@ -784,7 +784,7 @@ public class PublishSubscribeService {
if (scodec != null) { if (scodec != null) {
Queue<RedisPubSubListener<?>> listeners = pubSubEntry.getListeners(entry.getKey().getChannelName()); Queue<RedisPubSubListener<?>> listeners = pubSubEntry.getListeners(entry.getKey().getChannelName());
unsubscribe(entry.getKey().getChannelName(), pubSubEntry, PubSubType.SUNSUBSCRIBE); unsubscribe(entry.getKey().getChannelName(), pubSubEntry, PubSubType.SUNSUBSCRIBE);
subscribe(codec, entry.getKey().getChannelName(), listeners.toArray(new RedisPubSubListener[0])); ssubscribe(codec, entry.getKey().getChannelName(), listeners.toArray(new RedisPubSubListener[0]));
} }
Codec patternCodec = pubSubEntry.getConnection().getPatternChannels().get(entry.getKey().getChannelName()); Codec patternCodec = pubSubEntry.getConnection().getPatternChannels().get(entry.getKey().getChannelName());
@ -876,7 +876,7 @@ public class PublishSubscribeService {
return; return;
} }
log.info("listeners of '{}' channel have been resubscribed to '{}'", channelName, res); log.info("listeners of '{}' sharded-channel have been resubscribed to '{}'", channelName, res);
}); });
} }

@ -3,21 +3,26 @@ package org.redisson;
import org.awaitility.Awaitility; import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.redisson.api.NatMapper;
import org.redisson.api.RShardedTopic; import org.redisson.api.RShardedTopic;
import org.redisson.api.RTopic; import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.redisson.api.listener.StatusListener;
import org.redisson.api.redisnode.RedisCluster;
import org.redisson.api.redisnode.RedisClusterMaster;
import org.redisson.api.redisnode.RedisNodes;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisClientConfig;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException; import org.redisson.client.RedisException;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.config.Config; import org.redisson.config.Config;
import org.redisson.connection.balancer.RandomLoadBalancer; import org.redisson.config.SubscriptionMode;
import org.redisson.misc.RedisURI;
import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import java.io.IOException;
import java.time.Duration; import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
@ -29,10 +34,7 @@ public class RedissonShardedTopicTest extends RedisDockerTest {
GenericContainer<?> redis = createRedis("6.2"); GenericContainer<?> redis = createRedis("6.2");
redis.start(); redis.start();
Config config = new Config(); Config config = createConfig(redis);
config.setProtocol(protocol);
config.useSingleServer()
.setAddress("redis://127.0.0.1:" + redis.getFirstMappedPort());
RedissonClient redisson = Redisson.create(config); RedissonClient redisson = Redisson.create(config);
RShardedTopic t = redisson.getShardedTopic("ttt"); RShardedTopic t = redisson.getShardedTopic("ttt");
@ -46,6 +48,59 @@ public class RedissonShardedTopicTest extends RedisDockerTest {
redis.stop(); redis.stop();
} }
@Test
public void testReattachInClusterMaster() {
withNewCluster(redissonClient -> {
Config cfg = redissonClient.getConfig();
cfg.useClusterServers()
.setPingConnectionInterval(0)
.setSubscriptionMode(SubscriptionMode.MASTER);
RedissonClient redisson = Redisson.create(cfg);
final AtomicBoolean executed = new AtomicBoolean();
final AtomicInteger subscriptions = new AtomicInteger();
RTopic topic = redisson.getShardedTopic("3");
topic.addListener(new StatusListener() {
@Override
public void onUnsubscribe(String channel) {
}
@Override
public void onSubscribe(String channel) {
subscriptions.incrementAndGet();
}
});
topic.addListener(Integer.class, new MessageListener<Integer>() {
@Override
public void onMessage(CharSequence channel, Integer msg) {
executed.set(true);
}
});
RedisCluster rnc = redisson.getRedisNodes(RedisNodes.CLUSTER);
for (RedisClusterMaster master : rnc.getMasters()) {
RedisClientConfig cc = new RedisClientConfig();
cc.setAddress("redis://" + master.getAddr().getHostString() + ":" + master.getAddr().getPort());
RedisClient c = RedisClient.create(cc);
RedisConnection cn = c.connect();
List<String> channels = cn.sync(RedisCommands.PUBSUB_SHARDCHANNELS);
if (channels.contains("3")) {
cn.async(RedisCommands.SHUTDOWN);
}
c.shutdown();
}
Awaitility.waitAtMost(Duration.ofSeconds(30)).until(() -> subscriptions.get() == 2);
redisson.getShardedTopic("3").publish(1);
assertThat(executed.get()).isTrue();
redisson.shutdown();
});
}
@Test @Test
public void testClusterSharding() { public void testClusterSharding() {
testInCluster(redisson -> { testInCluster(redisson -> {

Loading…
Cancel
Save