Fixed - PubSub subscription in cluster sometimes doesn't apply to all nodes. #4702

pull/4536/merge
Nikita Koksharov 2 years ago
parent 5a60e7e0cb
commit 8711922aea

@ -3,10 +3,12 @@ package org.redisson.spring.data.connection;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.junit.Test;
import org.redisson.ClusterRunner;
import org.redisson.RedisRunner;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnection;
@ -25,6 +27,92 @@ import static org.assertj.core.api.Assertions.assertThat;
public class RedissonSubscribeTest extends BaseConnectionTest {
@Test
public void testCluster() throws IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave() .notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.K,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.x,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.$)
;
RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave() .notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.K,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.x,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.$)
;
RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave() .notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.K,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.x,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.$)
;
RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave() .notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.K,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.x,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.$)
;
RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave() .notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.K,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.x,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.$)
;
RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave() .notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.K,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.x,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.$)
;
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterRunner.ClusterProcesses process = clusterRunner.run();
Thread.sleep(5000);
Config config = new Config();
config.useClusterServers()
.setPingConnectionInterval(0)
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
Queue<String> names = new ConcurrentLinkedQueue<>();
container.addMessageListener(new MessageListener() {
@Override
public void onMessage(Message message, byte[] pattern) {
names.add(new String(message.getBody()));
}
}, new PatternTopic("__keyevent@0__:expired"));
container.afterPropertiesSet();
container.start();
factory.getConnection().setEx("EG:test:key1".getBytes(), 3, "123".getBytes());
factory.getConnection().setEx("test:key2".getBytes(), 3, "123".getBytes());
factory.getConnection().setEx("test:key1".getBytes(), 3, "123".getBytes());
Awaitility.await().atMost(Duration.FIVE_SECONDS).untilAsserted(() -> {
assertThat(names).containsExactlyInAnyOrder("EG:test:key1", "test:key2", "test:key1");
});
redisson.shutdown();
process.shutdown();
}
@Test
public void testListenersDuplication() {
Queue<byte[]> msg = new ConcurrentLinkedQueue<>();

@ -3,10 +3,12 @@ package org.redisson.spring.data.connection;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.junit.Test;
import org.redisson.ClusterRunner;
import org.redisson.RedisRunner;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnection;
@ -25,6 +27,92 @@ import static org.assertj.core.api.Assertions.assertThat;
public class RedissonSubscribeTest extends BaseConnectionTest {
@Test
public void testCluster() throws IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave() .notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.K,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.x,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.$)
;
RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave() .notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.K,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.x,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.$)
;
RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave() .notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.K,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.x,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.$)
;
RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave() .notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.K,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.x,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.$)
;
RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave() .notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.K,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.x,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.$)
;
RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave() .notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.K,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.x,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.$)
;
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterRunner.ClusterProcesses process = clusterRunner.run();
Thread.sleep(5000);
Config config = new Config();
config.useClusterServers()
.setPingConnectionInterval(0)
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
Queue<String> names = new ConcurrentLinkedQueue<>();
container.addMessageListener(new MessageListener() {
@Override
public void onMessage(Message message, byte[] pattern) {
names.add(new String(message.getBody()));
}
}, new PatternTopic("__keyevent@0__:expired"));
container.afterPropertiesSet();
container.start();
factory.getConnection().setEx("EG:test:key1".getBytes(), 3, "123".getBytes());
factory.getConnection().setEx("test:key2".getBytes(), 3, "123".getBytes());
factory.getConnection().setEx("test:key1".getBytes(), 3, "123".getBytes());
Awaitility.await().atMost(Duration.FIVE_SECONDS).untilAsserted(() -> {
assertThat(names).containsExactlyInAnyOrder("EG:test:key1", "test:key2", "test:key1");
});
redisson.shutdown();
process.shutdown();
}
@Test
public void testListenersDuplication() {
Queue<byte[]> msg = new ConcurrentLinkedQueue<>();

@ -101,8 +101,6 @@ public class PublishSubscribeService {
private final ConcurrentMap<MasterSlaveEntry, PubSubEntry> entry2PubSubConnection = new ConcurrentHashMap<>();
private final Queue<PubSubConnectionEntry> emptyQueue = new LinkedList<>();
private final SemaphorePubSub semaphorePubSub = new SemaphorePubSub(this);
private final CountDownLatchPubSub countDownLatchPubSub = new CountDownLatchPubSub(this);
@ -169,7 +167,15 @@ public class PublishSubscribeService {
});
}
CompletableFuture<PubSubConnectionEntry> f = subscribe(PubSubType.PSUBSCRIBE, codec, channelName, getEntry(channelName), listeners);
MasterSlaveEntry entry = getEntry(channelName);
if (entry == null) {
RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings.");
CompletableFuture<Collection<PubSubConnectionEntry>> promise = new CompletableFuture<>();
promise.completeExceptionally(ex);
return promise;
}
CompletableFuture<PubSubConnectionEntry> f = subscribe(PubSubType.PSUBSCRIBE, codec, channelName, entry, listeners);
return f.thenApply(res -> Collections.singletonList(res));
}
@ -180,11 +186,25 @@ public class PublishSubscribeService {
}
public CompletableFuture<PubSubConnectionEntry> subscribe(Codec codec, ChannelName channelName, RedisPubSubListener<?>... listeners) {
return subscribe(PubSubType.SUBSCRIBE, codec, channelName, getEntry(channelName), listeners);
MasterSlaveEntry entry = getEntry(channelName);
if (entry == null) {
RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings.");
CompletableFuture<PubSubConnectionEntry> promise = new CompletableFuture<>();
promise.completeExceptionally(ex);
return promise;
}
return subscribe(PubSubType.SUBSCRIBE, codec, channelName, entry, listeners);
}
public CompletableFuture<PubSubConnectionEntry> ssubscribe(Codec codec, ChannelName channelName, RedisPubSubListener<?>... listeners) {
return subscribe(PubSubType.SSUBSCRIBE, codec, channelName, getEntry(channelName), listeners);
MasterSlaveEntry entry = getEntry(channelName);
if (entry == null) {
RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings.");
CompletableFuture<PubSubConnectionEntry> promise = new CompletableFuture<>();
promise.completeExceptionally(ex);
return promise;
}
return subscribe(PubSubType.SSUBSCRIBE, codec, channelName, entry, listeners);
}
private CompletableFuture<PubSubConnectionEntry> subscribe(PubSubType type, Codec codec, ChannelName channelName,
@ -211,7 +231,14 @@ public class PublishSubscribeService {
public CompletableFuture<PubSubConnectionEntry> subscribeNoTimeout(Codec codec, String channelName,
AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) {
CompletableFuture<PubSubConnectionEntry> promise = new CompletableFuture<>();
subscribeNoTimeout(codec, new ChannelName(channelName), getEntry(new ChannelName(channelName)), promise,
MasterSlaveEntry entry = getEntry(new ChannelName(channelName));
if (entry == null) {
RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings.");
promise.completeExceptionally(ex);
return promise;
}
subscribeNoTimeout(codec, new ChannelName(channelName), entry, promise,
PubSubType.SUBSCRIBE, semaphore, new AtomicInteger(), listeners);
return promise;
}
@ -406,7 +433,8 @@ public class PublishSubscribeService {
}
if (remainFreeAmount > 0) {
addFreeConnectionEntry(channelName, entry);
PubSubEntry psEntry = entry2PubSubConnection.computeIfAbsent(msEntry, e -> new PubSubEntry());
psEntry.getEntries().add(entry);
}
freePubSubLock.release();
@ -436,7 +464,11 @@ public class PublishSubscribeService {
if (type == topicType && channel.equals(channelName)) {
if (entry.release() == 1) {
MasterSlaveEntry msEntry = getEntry(channelName);
msEntry.returnPubSubConnection(entry.getConnection());
if (msEntry != null) {
msEntry.returnPubSubConnection(entry.getConnection());
} else {
entry.getConnection().closeAsync();
}
}
result.complete(null);
@ -456,7 +488,15 @@ public class PublishSubscribeService {
}
public CompletableFuture<Codec> unsubscribe(ChannelName channelName, PubSubType topicType) {
return unsubscribe(channelName, getEntry(channelName), topicType);
MasterSlaveEntry entry = getEntry(channelName);
if (entry == null) {
RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings.");
CompletableFuture<Codec> promise = new CompletableFuture<>();
promise.completeExceptionally(ex);
return promise;
}
return unsubscribe(channelName, entry, topicType);
}
private CompletableFuture<Codec> unsubscribe(ChannelName channelName, MasterSlaveEntry e, PubSubType topicType) {
@ -510,12 +550,6 @@ public class PublishSubscribeService {
});
}
private void addFreeConnectionEntry(ChannelName channelName, PubSubConnectionEntry entry) {
MasterSlaveEntry me = getEntry(channelName);
PubSubEntry psEntry = entry2PubSubConnection.computeIfAbsent(me, e -> new PubSubEntry());
psEntry.getEntries().add(entry);
}
public void reattachPubSub(int slot) {
name2PubSubConnection.entrySet().stream()
.filter(e -> connectionManager.calcSlot(e.getKey().getChannelName().getName()) == slot)

Loading…
Cancel
Save