Fixed - Blocking commands connected to Redis Cluster aren't resubscribed after Master node failover. #2832

pull/2853/head
Nikita Koksharov 5 years ago
parent e58f84fa93
commit 82fbf7694b

@ -20,7 +20,6 @@ import org.redisson.api.NodeType;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.RedisCommand;
@ -41,6 +40,7 @@ import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@ -204,7 +204,7 @@ public class MasterSlaveEntry {
for (RedisConnection connection : entry.getAllConnections()) {
connection.closeAsync();
reattachBlockingQueue(connection);
reattachBlockingQueue(connection.getCurrentCommand());
}
while (true) {
RedisConnection connection = entry.pollConnection();
@ -229,35 +229,54 @@ public class MasterSlaveEntry {
return true;
}
private void reattachBlockingQueue(RedisConnection connection) {
CommandData<?, ?> commandData = connection.getCurrentCommand();
private void reattachBlockingQueue(CommandData<?, ?> commandData) {
if (commandData == null
|| !commandData.isBlockingCommand()
|| commandData.getPromise().isDone()) {
return;
}
RFuture<RedisConnection> newConnectionFuture = connectionWriteOp(commandData.getCommand());
String key = null;
for (int i = 0; i < commandData.getParams().length; i++) {
Object param = commandData.getParams()[i];
if ("STREAMS".equals(param)) {
key = (String) commandData.getParams()[i+1];
break;
}
}
if (key == null) {
key = (String) commandData.getParams()[0];
}
MasterSlaveEntry entry = connectionManager.getEntry(key);
if (entry == null) {
connectionManager.newTimeout(timeout ->
reattachBlockingQueue(commandData), 1, TimeUnit.SECONDS);
return;
}
RFuture<RedisConnection> newConnectionFuture = entry.connectionWriteOp(commandData.getCommand());
newConnectionFuture.onComplete((newConnection, e) -> {
if (e != null) {
log.error("Can't resubscribe blocking queue " + commandData, e);
connectionManager.newTimeout(timeout ->
reattachBlockingQueue(commandData), 1, TimeUnit.SECONDS);
return;
}
if (commandData.getPromise().isDone()) {
releaseWrite(newConnection);
entry.releaseWrite(newConnection);
return;
}
ChannelFuture channelFuture = newConnection.send(commandData);
channelFuture.addListener(future -> {
if (!future.isSuccess()) {
commandData.getPromise().tryFailure(new RedisException("Can't resubscribe blocking queue " + commandData + " to " + newConnection));
connectionManager.newTimeout(timeout ->
reattachBlockingQueue(commandData), 1, TimeUnit.SECONDS);
}
});
commandData.getPromise().onComplete((r, ex) -> {
releaseWrite(newConnection);
entry.releaseWrite(newConnection);
});
});
}

@ -4,14 +4,9 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -24,8 +19,12 @@ import org.redisson.RedisRunner.RedisProcess;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.api.redisnode.RedisClusterMaster;
import org.redisson.api.redisnode.RedisNodes;
import org.redisson.config.Config;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RedissonBlockingQueueTest extends RedissonQueueTest {
@ -167,47 +166,102 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
}
@Test
public void testFailoverInSentinel() throws Exception {
public void testTakeReattachCluster() throws IOException, InterruptedException, TimeoutException, ExecutionException {
RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave();
RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave();
RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave();
RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave();
RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave();
RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterProcesses process = clusterRunner.run();
Thread.sleep(1000);
Config config = new Config();
config.useClusterServers()
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RedisProcess master = process.getNodes().stream().filter(x -> x.getRedisServerPort() == master1.getPort()).findFirst().get();
List<RFuture<Integer>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
RBlockingQueue<Integer> queue = redisson.getBlockingQueue("queue" + i);
RFuture<Integer> f = queue.takeAsync();
f.await(1, TimeUnit.SECONDS);
futures.add(f);
}
master.stop();
Thread.sleep(TimeUnit.SECONDS.toMillis(80));
for (int i = 0; i < 10; i++) {
RBlockingQueue<Integer> queue = redisson.getBlockingQueue("queue" + i);
queue.put(i*100);
}
for (int i = 0; i < 10; i++) {
RFuture<Integer> f = futures.get(i);
f.await(20, TimeUnit.SECONDS);
if (f.cause() != null) {
f.cause().printStackTrace();
}
Integer result = f.getNow();
assertThat(result).isEqualTo(i*100);
}
redisson.shutdown();
process.shutdown();
}
@Test
public void testTakeReattachSentinel() throws IOException, InterruptedException, TimeoutException, ExecutionException {
RedisRunner.RedisProcess master = new RedisRunner()
.nosave()
.randomDir()
.randomPort()
.run();
RedisRunner.RedisProcess slave1 = new RedisRunner()
.randomPort()
.port(6380)
.nosave()
.randomDir()
.slaveof("127.0.0.1", master.getRedisServerPort())
.slaveof("127.0.0.1", 6379)
.run();
RedisRunner.RedisProcess slave2 = new RedisRunner()
.randomPort()
.port(6381)
.nosave()
.randomDir()
.slaveof("127.0.0.1", master.getRedisServerPort())
.slaveof("127.0.0.1", 6379)
.run();
RedisRunner.RedisProcess sentinel1 = new RedisRunner()
.nosave()
.randomDir()
.randomPort()
.port(26379)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", master.getRedisServerPort(), 2)
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
RedisRunner.RedisProcess sentinel2 = new RedisRunner()
.nosave()
.randomDir()
.randomPort()
.port(26380)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", master.getRedisServerPort(), 2)
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
RedisRunner.RedisProcess sentinel3 = new RedisRunner()
.nosave()
.randomDir()
.randomPort()
.port(26381)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", master.getRedisServerPort(), 2)
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
Thread.sleep(5000);
Thread.sleep(1000);
Config config = new Config();
config.useSentinelServers()
@ -220,22 +274,19 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
f.await(1, TimeUnit.SECONDS);
master.stop();
System.out.println("master " + master.getRedisServerAddressAndPort() + " stopped!");
Thread.sleep(TimeUnit.SECONDS.toMillis(70));
master = new RedisRunner()
.port(master.getRedisServerPort())
.nosave()
.randomDir()
.run();
Thread.sleep(TimeUnit.SECONDS.toMillis(60));
System.out.println("master " + master.getRedisServerAddressAndPort() + " started!");
queue1.put(123);
Thread.sleep(25000);
// check connection rotation
for (int i = 0; i < 10; i++) {
queue1.put(i + 10000);
}
assertThat(queue1.size()).isEqualTo(10);
queue1.put(1);
assertThat(f.get()).isEqualTo(1);
Integer result = f.get(80, TimeUnit.SECONDS);
assertThat(result).isEqualTo(123);
redisson.shutdown();
sentinel1.stop();
@ -244,6 +295,7 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
master.stop();
slave1.stop();
slave2.stop();
}
@Test
@ -257,6 +309,7 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
Config config = new Config();
config.useSingleServer().setAddress(runner.getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RBlockingQueue<Integer> queue1 = getQueue(redisson);
RFuture<Integer> f = queue1.takeAsync();
f.await(1, TimeUnit.SECONDS);

Loading…
Cancel
Save