diff --git a/redisson/src/main/java/org/redisson/RedissonPatternTopic.java b/redisson/src/main/java/org/redisson/RedissonPatternTopic.java index 1e3da0855..74bbb6874 100644 --- a/redisson/src/main/java/org/redisson/RedissonPatternTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonPatternTopic.java @@ -164,7 +164,7 @@ public class RedissonPatternTopic implements RPatternTopic { return; } - if (entry.removeAllListeners(channelName)) { + if (entry.hasListeners(channelName)) { subscribeService.punsubscribe(channelName, semaphore); } else { semaphore.release(); diff --git a/redisson/src/main/java/org/redisson/RedissonTopic.java b/redisson/src/main/java/org/redisson/RedissonTopic.java index ef162b06c..f403b16b5 100644 --- a/redisson/src/main/java/org/redisson/RedissonTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonTopic.java @@ -133,7 +133,7 @@ public class RedissonTopic implements RTopic { return; } - if (entry.removeAllListeners(channelName)) { + if (entry.hasListeners(channelName)) { subscribeService.unsubscribe(channelName, semaphore).syncUninterruptibly(); } else { semaphore.release(); diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index 0a261205e..517b315e9 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -445,6 +445,7 @@ public interface RedisCommands { RedisStrictCommand PUBLISH = new RedisStrictCommand("PUBLISH"); RedisCommand PUBSUB_NUMSUB = new RedisCommand("PUBSUB", "NUMSUB", new ListObjectDecoder(1)); + RedisCommand> PUBSUB_CHANNELS = new RedisStrictCommand<>("PUBSUB", "CHANNELS", new StringListReplayDecoder()); RedisCommand SUBSCRIBE = new RedisCommand("SUBSCRIBE", new PubSubStatusDecoder()); RedisCommand UNSUBSCRIBE = new RedisCommand("UNSUBSCRIBE", new PubSubStatusDecoder()); diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index ac3a5e347..0e3692396 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -663,6 +663,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } private void checkSlotsMigration(Collection newPartitions) { + Set changedSlots = new HashSet<>(); for (ClusterPartition currentPartition : getLastPartitions()) { for (ClusterPartition newPartition : newPartitions) { if (!currentPartition.getNodeId().equals(newPartition.getNodeId())) { @@ -677,6 +678,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { addedSlots.stream().forEach(slot -> { addEntry(slot, entry); lastPartitions.put(slot, currentPartition); + changedSlots.add(slot); }); if (!addedSlots.isEmpty()) { log.info("{} slots added to {}", addedSlots.cardinality(), currentPartition.getMasterAddress()); @@ -686,9 +688,10 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { removedSlots.andNot(newPartition.slots()); currentPartition.removeSlots(removedSlots); - removedSlots.stream().forEach(removeSlot -> { - if (lastPartitions.remove(removeSlot, currentPartition)) { - removeEntry(removeSlot); + removedSlots.stream().forEach(slot -> { + if (lastPartitions.remove(slot, currentPartition)) { + removeEntry(slot); + changedSlots.add(slot); } }); if (!removedSlots.isEmpty()) { @@ -697,6 +700,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { break; } } + + changedSlots.forEach(subscribeService::reattachPubSub); } private int indexOf(byte[] array, byte element) { diff --git a/redisson/src/main/java/org/redisson/pubsub/PubSubConnectionEntry.java b/redisson/src/main/java/org/redisson/pubsub/PubSubConnectionEntry.java index 6183930bf..161d85e58 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PubSubConnectionEntry.java +++ b/redisson/src/main/java/org/redisson/pubsub/PubSubConnectionEntry.java @@ -62,12 +62,8 @@ public class PubSubConnectionEntry { return channelListeners.containsKey(channelName); } - public Collection> getListeners(ChannelName channelName) { - Collection> result = channelListeners.get(channelName); - if (result == null) { - return Collections.emptyList(); - } - return result; + public Queue> getListeners(ChannelName channelName) { + return channelListeners.getOrDefault(channelName, EMPTY_QUEUE); } public void addListener(ChannelName channelName, RedisPubSubListener listener) { @@ -100,14 +96,6 @@ public class PubSubConnectionEntry { conn.addListener(listener); } - public boolean removeAllListeners(ChannelName channelName) { - Queue> listeners = channelListeners.get(channelName); - for (RedisPubSubListener listener : listeners) { - removeListener(channelName, listener); - } - return listeners.isEmpty(); - } - // TODO optimize public boolean removeListener(ChannelName channelName, EventListener msgListener) { Queue> listeners = channelListeners.get(channelName); @@ -188,7 +176,7 @@ public class PubSubConnectionEntry { return listener; } - public ChannelFuture unsubscribe(final ChannelName channel, final RedisPubSubListener listener) { + public ChannelFuture unsubscribe(ChannelName channel, RedisPubSubListener listener) { conn.addListener(new BaseRedisPubSubListener() { @Override public boolean onStatus(PubSubType type, CharSequence ch) { diff --git a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java index d6f3a4569..f8ab3519f 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java +++ b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java @@ -518,6 +518,27 @@ public class PublishSubscribeService { freePubSubConnections.add(entry); } + public void reattachPubSub(int slot) { + name2PubSubConnection.entrySet().stream() + .filter(e -> connectionManager.calcSlot(e.getKey().getName()) == slot) + .forEach(entry -> { + PubSubConnectionEntry pubSubEntry = entry.getValue(); + Codec codec = pubSubEntry.getConnection().getChannels().get(entry.getKey()); + if (codec != null) { + Queue> listeners = pubSubEntry.getListeners(entry.getKey()); + unsubscribe(entry.getKey(), PubSubType.UNSUBSCRIBE); + subscribe(codec, entry.getKey(), listeners.toArray(new RedisPubSubListener[0])); + } + + Codec patternCodec = pubSubEntry.getConnection().getPatternChannels().get(entry.getKey()); + if (patternCodec != null) { + Queue> listeners = pubSubEntry.getListeners(entry.getKey()); + unsubscribe(entry.getKey(), PubSubType.PUNSUBSCRIBE); + psubscribe(entry.getKey(), patternCodec, listeners.toArray(new RedisPubSubListener[0])); + } + }); + } + public void reattachPubSub(RedisPubSubConnection redisPubSubConnection) { for (Queue queue : freePubSubMap.values()) { for (PubSubConnectionEntry entry : queue) { diff --git a/redisson/src/test/java/org/redisson/RedissonTopicTest.java b/redisson/src/test/java/org/redisson/RedissonTopicTest.java index ec4a74fcc..59fe8942f 100644 --- a/redisson/src/test/java/org/redisson/RedissonTopicTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTopicTest.java @@ -6,12 +6,7 @@ import static org.awaitility.Awaitility.await; import java.io.IOException; import java.io.Serializable; import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; -import java.util.UUID; +import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -22,6 +17,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import org.awaitility.Awaitility; import org.junit.After; @@ -46,12 +43,18 @@ import org.redisson.api.listener.PatternStatusListener; import org.redisson.api.listener.StatusListener; import org.redisson.client.RedisClient; import org.redisson.client.RedisClientConfig; +import org.redisson.client.RedisConnection; import org.redisson.client.RedisTimeoutException; import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.StringCodec; +import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisStrictCommand; +import org.redisson.client.protocol.decoder.StringListReplayDecoder; +import org.redisson.cluster.ClusterNodeInfo; import org.redisson.config.Config; import org.redisson.config.SubscriptionMode; +import org.redisson.connection.CRC16; +import org.redisson.connection.MasterSlaveConnectionManager; import org.redisson.connection.balancer.RandomLoadBalancer; public class RedissonTopicTest { @@ -391,6 +394,100 @@ public class RedissonTopicTest { Assert.assertTrue(l.await(5, TimeUnit.SECONDS)); } + @Test + public void testSlotMigrationInCluster() throws Exception { + RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slot1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slot2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slot3 = new RedisRunner().randomPort().randomDir().nosave(); + + ClusterRunner clusterRunner = new ClusterRunner() + .addNode(master1, slot1) + .addNode(master2, slot2) + .addNode(master3, slot3); + ClusterProcesses process = clusterRunner.run(); + + Config config = new Config(); + config.useClusterServers() + .setScanInterval(1000) + .setSubscriptionMode(SubscriptionMode.MASTER) + .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); + RedissonClient redisson = Redisson.create(config); + + RedisClientConfig cfg = new RedisClientConfig(); + cfg.setAddress(process.getNodes().iterator().next().getRedisServerAddressAndPort()); + RedisClient c = RedisClient.create(cfg); + RedisConnection cc = c.connect(); + List mastersList = cc.sync(RedisCommands.CLUSTER_NODES); + mastersList = mastersList.stream().filter(i -> i.containsFlag(ClusterNodeInfo.Flag.MASTER)).collect(Collectors.toList()); + c.shutdown(); + + ClusterNodeInfo destination = mastersList.stream().filter(i -> i.getSlotRanges().iterator().next().getStartSlot() != 10922).findAny().get(); + ClusterNodeInfo source = mastersList.stream().filter(i -> i.getSlotRanges().iterator().next().getStartSlot() == 10922).findAny().get(); + + RedisClientConfig sourceCfg = new RedisClientConfig(); + sourceCfg.setAddress(source.getAddress()); + RedisClient sourceClient = RedisClient.create(sourceCfg); + RedisConnection sourceConnection = sourceClient.connect(); + + RedisClientConfig destinationCfg = new RedisClientConfig(); + destinationCfg.setAddress(destination.getAddress()); + RedisClient destinationClient = RedisClient.create(destinationCfg); + RedisConnection destinationConnection = destinationClient.connect(); + + AtomicReference reference = new AtomicReference(); + String channelName = "test{kaO}"; + RTopic topic = redisson.getTopic(channelName); + topic.addListener(String.class, (ch, m) -> { + reference.set(m); + }); + + List destList = destinationConnection.sync(RedisCommands.PUBSUB_CHANNELS); + assertThat(destList).isEmpty(); + List sourceList = sourceConnection.sync(RedisCommands.PUBSUB_CHANNELS); + assertThat(sourceList).containsOnly(channelName); + + destinationConnection.sync(RedisCommands.CLUSTER_SETSLOT, source.getSlotRanges().iterator().next().getStartSlot(), "IMPORTING", source.getNodeId()); + sourceConnection.sync(RedisCommands.CLUSTER_SETSLOT, source.getSlotRanges().iterator().next().getStartSlot(), "MIGRATING", destination.getNodeId()); + + List keys = sourceConnection.sync(RedisCommands.CLUSTER_GETKEYSINSLOT, source.getSlotRanges().iterator().next().getStartSlot(), 100); + List params = new ArrayList(); + params.add(destination.getAddress().getHost()); + params.add(destination.getAddress().getPort()); + params.add(""); + params.add(0); + params.add(2000); + params.add("KEYS"); + params.addAll(keys); + sourceConnection.async(RedisCommands.MIGRATE, params.toArray()); + + for (ClusterNodeInfo node : mastersList) { + RedisClientConfig cc1 = new RedisClientConfig(); + cc1.setAddress(node.getAddress()); + RedisClient ccc = RedisClient.create(cc1); + RedisConnection connection = ccc.connect(); + connection.sync(RedisCommands.CLUSTER_SETSLOT, source.getSlotRanges().iterator().next().getStartSlot(), "NODE", destination.getNodeId()); + ccc.shutdownAsync(); + } + + Thread.sleep(2000); + + topic.publish("mymessage"); + Awaitility.waitAtMost(Duration.ofSeconds(1)).until(() -> reference.get().equals("mymessage")); + + List destList2 = destinationConnection.sync(RedisCommands.PUBSUB_CHANNELS); + assertThat(destList2).containsOnly(channelName); + List sourceList2 = sourceConnection.sync(RedisCommands.PUBSUB_CHANNELS); + assertThat(sourceList2).isEmpty(); + + sourceClient.shutdown(); + destinationClient.shutdown(); + redisson.shutdown(); + process.shutdown(); + } + @Test public void testUnsubscribe() throws InterruptedException { final CountDownLatch messageRecieved = new CountDownLatch(1);