Fixed - Pubsub channel isn't reattached to a new master after slot migration #3077

pull/3083/head
Nikita Koksharov 4 years ago
parent 1f79c80171
commit 50cafbb622

@ -164,7 +164,7 @@ public class RedissonPatternTopic implements RPatternTopic {
return;
}
if (entry.removeAllListeners(channelName)) {
if (entry.hasListeners(channelName)) {
subscribeService.punsubscribe(channelName, semaphore);
} else {
semaphore.release();

@ -133,7 +133,7 @@ public class RedissonTopic implements RTopic {
return;
}
if (entry.removeAllListeners(channelName)) {
if (entry.hasListeners(channelName)) {
subscribeService.unsubscribe(channelName, semaphore).syncUninterruptibly();
} else {
semaphore.release();

@ -445,6 +445,7 @@ public interface RedisCommands {
RedisStrictCommand<Long> PUBLISH = new RedisStrictCommand<Long>("PUBLISH");
RedisCommand<Long> PUBSUB_NUMSUB = new RedisCommand<Long>("PUBSUB", "NUMSUB", new ListObjectDecoder<Long>(1));
RedisCommand<List<String>> PUBSUB_CHANNELS = new RedisStrictCommand<>("PUBSUB", "CHANNELS", new StringListReplayDecoder());
RedisCommand<Object> SUBSCRIBE = new RedisCommand<Object>("SUBSCRIBE", new PubSubStatusDecoder());
RedisCommand<Object> UNSUBSCRIBE = new RedisCommand<Object>("UNSUBSCRIBE", new PubSubStatusDecoder());

@ -663,6 +663,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
private void checkSlotsMigration(Collection<ClusterPartition> newPartitions) {
Set<Integer> changedSlots = new HashSet<>();
for (ClusterPartition currentPartition : getLastPartitions()) {
for (ClusterPartition newPartition : newPartitions) {
if (!currentPartition.getNodeId().equals(newPartition.getNodeId())) {
@ -677,6 +678,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
addedSlots.stream().forEach(slot -> {
addEntry(slot, entry);
lastPartitions.put(slot, currentPartition);
changedSlots.add(slot);
});
if (!addedSlots.isEmpty()) {
log.info("{} slots added to {}", addedSlots.cardinality(), currentPartition.getMasterAddress());
@ -686,9 +688,10 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
removedSlots.andNot(newPartition.slots());
currentPartition.removeSlots(removedSlots);
removedSlots.stream().forEach(removeSlot -> {
if (lastPartitions.remove(removeSlot, currentPartition)) {
removeEntry(removeSlot);
removedSlots.stream().forEach(slot -> {
if (lastPartitions.remove(slot, currentPartition)) {
removeEntry(slot);
changedSlots.add(slot);
}
});
if (!removedSlots.isEmpty()) {
@ -697,6 +700,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
break;
}
}
changedSlots.forEach(subscribeService::reattachPubSub);
}
private int indexOf(byte[] array, byte element) {

@ -62,12 +62,8 @@ public class PubSubConnectionEntry {
return channelListeners.containsKey(channelName);
}
public Collection<RedisPubSubListener<?>> getListeners(ChannelName channelName) {
Collection<RedisPubSubListener<?>> result = channelListeners.get(channelName);
if (result == null) {
return Collections.emptyList();
}
return result;
public Queue<RedisPubSubListener<?>> getListeners(ChannelName channelName) {
return channelListeners.getOrDefault(channelName, EMPTY_QUEUE);
}
public void addListener(ChannelName channelName, RedisPubSubListener<?> listener) {
@ -100,14 +96,6 @@ public class PubSubConnectionEntry {
conn.addListener(listener);
}
public boolean removeAllListeners(ChannelName channelName) {
Queue<RedisPubSubListener<?>> listeners = channelListeners.get(channelName);
for (RedisPubSubListener<?> listener : listeners) {
removeListener(channelName, listener);
}
return listeners.isEmpty();
}
// TODO optimize
public boolean removeListener(ChannelName channelName, EventListener msgListener) {
Queue<RedisPubSubListener<?>> listeners = channelListeners.get(channelName);
@ -188,7 +176,7 @@ public class PubSubConnectionEntry {
return listener;
}
public ChannelFuture unsubscribe(final ChannelName channel, final RedisPubSubListener<?> listener) {
public ChannelFuture unsubscribe(ChannelName channel, RedisPubSubListener<?> listener) {
conn.addListener(new BaseRedisPubSubListener() {
@Override
public boolean onStatus(PubSubType type, CharSequence ch) {

@ -518,6 +518,27 @@ public class PublishSubscribeService {
freePubSubConnections.add(entry);
}
public void reattachPubSub(int slot) {
name2PubSubConnection.entrySet().stream()
.filter(e -> connectionManager.calcSlot(e.getKey().getName()) == slot)
.forEach(entry -> {
PubSubConnectionEntry pubSubEntry = entry.getValue();
Codec codec = pubSubEntry.getConnection().getChannels().get(entry.getKey());
if (codec != null) {
Queue<RedisPubSubListener<?>> listeners = pubSubEntry.getListeners(entry.getKey());
unsubscribe(entry.getKey(), PubSubType.UNSUBSCRIBE);
subscribe(codec, entry.getKey(), listeners.toArray(new RedisPubSubListener[0]));
}
Codec patternCodec = pubSubEntry.getConnection().getPatternChannels().get(entry.getKey());
if (patternCodec != null) {
Queue<RedisPubSubListener<?>> listeners = pubSubEntry.getListeners(entry.getKey());
unsubscribe(entry.getKey(), PubSubType.PUNSUBSCRIBE);
psubscribe(entry.getKey(), patternCodec, listeners.toArray(new RedisPubSubListener[0]));
}
});
}
public void reattachPubSub(RedisPubSubConnection redisPubSubConnection) {
for (Queue<PubSubConnectionEntry> queue : freePubSubMap.values()) {
for (PubSubConnectionEntry entry : queue) {

@ -6,12 +6,7 @@ import static org.awaitility.Awaitility.await;
import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -22,6 +17,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.awaitility.Awaitility;
import org.junit.After;
@ -46,12 +43,18 @@ import org.redisson.api.listener.PatternStatusListener;
import org.redisson.api.listener.StatusListener;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisClientConfig;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.decoder.StringListReplayDecoder;
import org.redisson.cluster.ClusterNodeInfo;
import org.redisson.config.Config;
import org.redisson.config.SubscriptionMode;
import org.redisson.connection.CRC16;
import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.balancer.RandomLoadBalancer;
public class RedissonTopicTest {
@ -391,6 +394,100 @@ public class RedissonTopicTest {
Assert.assertTrue(l.await(5, TimeUnit.SECONDS));
}
@Test
public void testSlotMigrationInCluster() throws Exception {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slot1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slot2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slot3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slot1)
.addNode(master2, slot2)
.addNode(master3, slot3);
ClusterProcesses process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setScanInterval(1000)
.setSubscriptionMode(SubscriptionMode.MASTER)
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RedisClientConfig cfg = new RedisClientConfig();
cfg.setAddress(process.getNodes().iterator().next().getRedisServerAddressAndPort());
RedisClient c = RedisClient.create(cfg);
RedisConnection cc = c.connect();
List<ClusterNodeInfo> mastersList = cc.sync(RedisCommands.CLUSTER_NODES);
mastersList = mastersList.stream().filter(i -> i.containsFlag(ClusterNodeInfo.Flag.MASTER)).collect(Collectors.toList());
c.shutdown();
ClusterNodeInfo destination = mastersList.stream().filter(i -> i.getSlotRanges().iterator().next().getStartSlot() != 10922).findAny().get();
ClusterNodeInfo source = mastersList.stream().filter(i -> i.getSlotRanges().iterator().next().getStartSlot() == 10922).findAny().get();
RedisClientConfig sourceCfg = new RedisClientConfig();
sourceCfg.setAddress(source.getAddress());
RedisClient sourceClient = RedisClient.create(sourceCfg);
RedisConnection sourceConnection = sourceClient.connect();
RedisClientConfig destinationCfg = new RedisClientConfig();
destinationCfg.setAddress(destination.getAddress());
RedisClient destinationClient = RedisClient.create(destinationCfg);
RedisConnection destinationConnection = destinationClient.connect();
AtomicReference<String> reference = new AtomicReference();
String channelName = "test{kaO}";
RTopic topic = redisson.getTopic(channelName);
topic.addListener(String.class, (ch, m) -> {
reference.set(m);
});
List<String> destList = destinationConnection.sync(RedisCommands.PUBSUB_CHANNELS);
assertThat(destList).isEmpty();
List<String> sourceList = sourceConnection.sync(RedisCommands.PUBSUB_CHANNELS);
assertThat(sourceList).containsOnly(channelName);
destinationConnection.sync(RedisCommands.CLUSTER_SETSLOT, source.getSlotRanges().iterator().next().getStartSlot(), "IMPORTING", source.getNodeId());
sourceConnection.sync(RedisCommands.CLUSTER_SETSLOT, source.getSlotRanges().iterator().next().getStartSlot(), "MIGRATING", destination.getNodeId());
List<String> keys = sourceConnection.sync(RedisCommands.CLUSTER_GETKEYSINSLOT, source.getSlotRanges().iterator().next().getStartSlot(), 100);
List<Object> params = new ArrayList<Object>();
params.add(destination.getAddress().getHost());
params.add(destination.getAddress().getPort());
params.add("");
params.add(0);
params.add(2000);
params.add("KEYS");
params.addAll(keys);
sourceConnection.async(RedisCommands.MIGRATE, params.toArray());
for (ClusterNodeInfo node : mastersList) {
RedisClientConfig cc1 = new RedisClientConfig();
cc1.setAddress(node.getAddress());
RedisClient ccc = RedisClient.create(cc1);
RedisConnection connection = ccc.connect();
connection.sync(RedisCommands.CLUSTER_SETSLOT, source.getSlotRanges().iterator().next().getStartSlot(), "NODE", destination.getNodeId());
ccc.shutdownAsync();
}
Thread.sleep(2000);
topic.publish("mymessage");
Awaitility.waitAtMost(Duration.ofSeconds(1)).until(() -> reference.get().equals("mymessage"));
List<String> destList2 = destinationConnection.sync(RedisCommands.PUBSUB_CHANNELS);
assertThat(destList2).containsOnly(channelName);
List<String> sourceList2 = sourceConnection.sync(RedisCommands.PUBSUB_CHANNELS);
assertThat(sourceList2).isEmpty();
sourceClient.shutdown();
destinationClient.shutdown();
redisson.shutdown();
process.shutdown();
}
@Test
public void testUnsubscribe() throws InterruptedException {
final CountDownLatch messageRecieved = new CountDownLatch(1);

Loading…
Cancel
Save