Fixed - RTopic and RShardedTopic fail to resubscribe after node's DNS record change. #6023

pull/6329/head
Nikita Koksharov 2 months ago
parent 000fe9685d
commit df58bf38c7

@ -669,6 +669,10 @@ public class PublishSubscribeService {
CompletableFuture<Void> unsubscribeLocked(PubSubType topicType, ChannelName channelName, PubSubConnectionEntry ce) {
remove(channelName, ce);
if (connectionManager.getServiceManager().isShuttingDown()) {
return CompletableFuture.completedFuture(null);
}
CompletableFuture<Void> result = new CompletableFuture<>();
BaseRedisPubSubListener listener = new BaseRedisPubSubListener() {
@ -699,11 +703,13 @@ public class PublishSubscribeService {
name2PubSubConnection.remove(new PubSubKey(channelName, entry.getEntry()));
ClientConnectionsEntry e = entry.getEntry().getEntry(entry.getConnection().getRedisClient());
Tuple<ChannelName, ClientConnectionsEntry> key = new Tuple<>(channelName, e);
key2connection.remove(key);
if (e.getTrackedConnectionsHolder().decUsage() == 0) {
e.getTrackedConnectionsHolder().reset();
trackedEntries.remove(entry);
if (e != null) {
Tuple<ChannelName, ClientConnectionsEntry> key = new Tuple<>(channelName, e);
key2connection.remove(key);
if (e.getTrackedConnectionsHolder().decUsage() == 0) {
e.getTrackedConnectionsHolder().reset();
trackedEntries.remove(entry);
}
}
name2entry.computeIfPresent(channelName, (name, entries) -> {
@ -830,6 +836,10 @@ public class PublishSubscribeService {
}
subscribeCodecFuture.whenComplete((subscribeCodec, e) -> {
if (e != null) {
log.error(e.getMessage(), e);
return;
}
if (subscribeCodec == null) {
return;
}

@ -1,5 +1,11 @@
package org.redisson;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.resolver.AddressResolverGroup;
import io.netty.resolver.dns.DnsServerAddressStreamProvider;
import io.netty.resolver.dns.DnsServerAddresses;
import io.netty.util.internal.SocketUtils;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
@ -20,13 +26,17 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.cluster.ClusterNodeInfo;
import org.redisson.config.Config;
import org.redisson.config.SubscriptionMode;
import org.redisson.connection.SequentialDnsAddressResolverFactory;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.redisson.misc.RedisURI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.ContainerState;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
@ -847,6 +857,61 @@ public class RedissonTopicTest extends RedisDockerTest {
redis.stop();
}
@Test
public void testHostnameChange() throws Exception {
GenericContainer<?> redis = createRedis();
redis.start();
SimpleDnsServer s = new SimpleDnsServer();
Config config = new Config();
config.setAddressResolverGroupFactory(new SequentialDnsAddressResolverFactory() {
@Override
public AddressResolverGroup<InetSocketAddress> create(Class<? extends DatagramChannel> channelType, Class<? extends SocketChannel> socketChannelType, DnsServerAddressStreamProvider nameServerProvider) {
return super.create(channelType, socketChannelType, hostname ->
DnsServerAddresses.singleton(SocketUtils.socketAddress("127.0.0.1", 55)).stream());
}
});
config.useSingleServer()
.setDnsMonitoringInterval(1000)
.setAddress("redis://simplehost:" + redis.getFirstMappedPort());
RedissonClient redisson = Redisson.create(config);
RTopic topic = redisson.getTopic("topic");
Logger logger = LoggerFactory.getLogger("out");
for (int i = 0; i < 10; i++) {
Set<String> messages = new HashSet<>();
topic.addListener(String.class, new MessageListener<String>() {
@Override
public void onMessage(CharSequence channel, String msg) {
messages.add(msg);
}
});
if (i == 1) {
s.updateIP("127.0.0.2");
}
for (int j = 0; j < 60; j++) {
topic.publish("test" + j);
Thread.sleep(100);
}
assertThat(messages.size()).isEqualTo(60);
topic.removeAllListeners();
logger.info("step1 " + i);
}
redisson.shutdown();
s.stop();
redis.stop();
}
// @Test
public void testReattachInSentinelLong() throws Exception {
@ -961,6 +1026,7 @@ public class RedissonTopicTest extends RedisDockerTest {
final AtomicInteger subscriptions = new AtomicInteger();
RedissonClient redisson = Redisson.create(config);
RTopic topic = redisson.getTopic("topic");
topic.addListener(new StatusListener() {
@ -988,7 +1054,7 @@ public class RedissonTopicTest extends RedisDockerTest {
TimeUnit.SECONDS.sleep(20);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
nodes.forEach(n -> n.start());

@ -11,8 +11,8 @@ import java.net.InetAddress;
public class SimpleDnsServer {
EventLoopGroup group = new NioEventLoopGroup();
private Channel channel;
private final EventLoopGroup group = new NioEventLoopGroup();
private final Channel channel;
private String ip = "127.0.0.1";
public SimpleDnsServer() throws InterruptedException {
@ -28,7 +28,7 @@ public class SimpleDnsServer {
}
});
ChannelFuture future = bootstrap.bind(55).sync(); // Bind to port 53 for DNS
ChannelFuture future = bootstrap.bind(55).sync();
channel = future.channel();
}
@ -47,9 +47,6 @@ public class SimpleDnsServer {
DefaultDnsQuestion question = query.recordAt(DnsSection.QUESTION);
String requestedDomain = question.name();
// System.out.println("Received DNS query for: " + requestedDomain + " " + query);
// Create a response with a dummy IP address
DatagramDnsResponse response = new DatagramDnsResponse(query.recipient(), query.sender(), query.id());
response.addRecord(DnsSection.QUESTION, question);
response.addRecord(DnsSection.ANSWER, new DefaultDnsRawRecord(question.name(), DnsRecordType.A, 0,

Loading…
Cancel
Save