diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index a5fa7a4a0..2a8f10b9d 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -20,7 +20,6 @@ import org.redisson.api.NodeType; import org.redisson.api.RFuture; import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; -import org.redisson.client.RedisException; import org.redisson.client.RedisPubSubConnection; import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.RedisCommand; @@ -41,6 +40,7 @@ import java.net.InetSocketAddress; import java.util.Collection; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -204,7 +204,7 @@ public class MasterSlaveEntry { for (RedisConnection connection : entry.getAllConnections()) { connection.closeAsync(); - reattachBlockingQueue(connection); + reattachBlockingQueue(connection.getCurrentCommand()); } while (true) { RedisConnection connection = entry.pollConnection(); @@ -229,35 +229,54 @@ public class MasterSlaveEntry { return true; } - private void reattachBlockingQueue(RedisConnection connection) { - CommandData commandData = connection.getCurrentCommand(); - - if (commandData == null + private void reattachBlockingQueue(CommandData commandData) { + if (commandData == null || !commandData.isBlockingCommand() || commandData.getPromise().isDone()) { return; } - RFuture newConnectionFuture = connectionWriteOp(commandData.getCommand()); + String key = null; + for (int i = 0; i < commandData.getParams().length; i++) { + Object param = commandData.getParams()[i]; + if ("STREAMS".equals(param)) { + key = (String) commandData.getParams()[i+1]; + break; + } + } + if (key == null) { + key = (String) commandData.getParams()[0]; + } + + MasterSlaveEntry entry = connectionManager.getEntry(key); + if (entry == null) { + connectionManager.newTimeout(timeout -> + reattachBlockingQueue(commandData), 1, TimeUnit.SECONDS); + return; + } + + RFuture newConnectionFuture = entry.connectionWriteOp(commandData.getCommand()); newConnectionFuture.onComplete((newConnection, e) -> { if (e != null) { - log.error("Can't resubscribe blocking queue " + commandData, e); + connectionManager.newTimeout(timeout -> + reattachBlockingQueue(commandData), 1, TimeUnit.SECONDS); return; } if (commandData.getPromise().isDone()) { - releaseWrite(newConnection); + entry.releaseWrite(newConnection); return; } ChannelFuture channelFuture = newConnection.send(commandData); channelFuture.addListener(future -> { if (!future.isSuccess()) { - commandData.getPromise().tryFailure(new RedisException("Can't resubscribe blocking queue " + commandData + " to " + newConnection)); + connectionManager.newTimeout(timeout -> + reattachBlockingQueue(commandData), 1, TimeUnit.SECONDS); } }); commandData.getPromise().onComplete((r, ex) -> { - releaseWrite(newConnection); + entry.releaseWrite(newConnection); }); }); } diff --git a/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java b/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java index a7aeee9f9..ad5d7f458 100644 --- a/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java @@ -4,14 +4,9 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.net.InetSocketAddress; +import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -24,8 +19,12 @@ import org.redisson.RedisRunner.RedisProcess; import org.redisson.api.RBlockingQueue; import org.redisson.api.RFuture; import org.redisson.api.RedissonClient; +import org.redisson.api.redisnode.RedisClusterMaster; +import org.redisson.api.redisnode.RedisNodes; import org.redisson.config.Config; import org.redisson.connection.balancer.RandomLoadBalancer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class RedissonBlockingQueueTest extends RedissonQueueTest { @@ -167,76 +166,128 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest { } @Test - public void testFailoverInSentinel() throws Exception { + public void testTakeReattachCluster() throws IOException, InterruptedException, TimeoutException, ExecutionException { + RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave(); + RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave(); + RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave(); + RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave(); + RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave(); + RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave(); + + ClusterRunner clusterRunner = new ClusterRunner() + .addNode(master1, slave1) + .addNode(master2, slave2) + .addNode(master3, slave3); + ClusterProcesses process = clusterRunner.run(); + + Thread.sleep(1000); + + Config config = new Config(); + config.useClusterServers() + .setLoadBalancer(new RandomLoadBalancer()) + .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); + RedissonClient redisson = Redisson.create(config); + + RedisProcess master = process.getNodes().stream().filter(x -> x.getRedisServerPort() == master1.getPort()).findFirst().get(); + + List> futures = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + RBlockingQueue queue = redisson.getBlockingQueue("queue" + i); + RFuture f = queue.takeAsync(); + f.await(1, TimeUnit.SECONDS); + futures.add(f); + } + + master.stop(); + + Thread.sleep(TimeUnit.SECONDS.toMillis(80)); + + for (int i = 0; i < 10; i++) { + RBlockingQueue queue = redisson.getBlockingQueue("queue" + i); + queue.put(i*100); + } + + for (int i = 0; i < 10; i++) { + RFuture f = futures.get(i); + f.await(20, TimeUnit.SECONDS); + if (f.cause() != null) { + f.cause().printStackTrace(); + } + Integer result = f.getNow(); + assertThat(result).isEqualTo(i*100); + } + + redisson.shutdown(); + process.shutdown(); + } + + @Test + public void testTakeReattachSentinel() throws IOException, InterruptedException, TimeoutException, ExecutionException { RedisRunner.RedisProcess master = new RedisRunner() .nosave() .randomDir() - .randomPort() .run(); RedisRunner.RedisProcess slave1 = new RedisRunner() - .randomPort() + .port(6380) .nosave() .randomDir() - .slaveof("127.0.0.1", master.getRedisServerPort()) + .slaveof("127.0.0.1", 6379) .run(); RedisRunner.RedisProcess slave2 = new RedisRunner() - .randomPort() + .port(6381) .nosave() .randomDir() - .slaveof("127.0.0.1", master.getRedisServerPort()) + .slaveof("127.0.0.1", 6379) .run(); RedisRunner.RedisProcess sentinel1 = new RedisRunner() .nosave() .randomDir() - .randomPort() + .port(26379) .sentinel() - .sentinelMonitor("myMaster", "127.0.0.1", master.getRedisServerPort(), 2) + .sentinelMonitor("myMaster", "127.0.0.1", 6379, 2) .run(); RedisRunner.RedisProcess sentinel2 = new RedisRunner() .nosave() .randomDir() - .randomPort() + .port(26380) .sentinel() - .sentinelMonitor("myMaster", "127.0.0.1", master.getRedisServerPort(), 2) + .sentinelMonitor("myMaster", "127.0.0.1", 6379, 2) .run(); RedisRunner.RedisProcess sentinel3 = new RedisRunner() .nosave() .randomDir() - .randomPort() + .port(26381) .sentinel() - .sentinelMonitor("myMaster", "127.0.0.1", master.getRedisServerPort(), 2) + .sentinelMonitor("myMaster", "127.0.0.1", 6379, 2) .run(); - - Thread.sleep(5000); - + + Thread.sleep(1000); + Config config = new Config(); config.useSentinelServers() .setLoadBalancer(new RandomLoadBalancer()) .addSentinelAddress(sentinel3.getRedisServerAddressAndPort()).setMasterName("myMaster"); RedissonClient redisson = Redisson.create(config); - + RBlockingQueue queue1 = getQueue(redisson); RFuture f = queue1.takeAsync(); f.await(1, TimeUnit.SECONDS); - + master.stop(); - System.out.println("master " + master.getRedisServerAddressAndPort() + " stopped!"); - - Thread.sleep(TimeUnit.SECONDS.toMillis(70)); - - master = new RedisRunner() - .port(master.getRedisServerPort()) - .nosave() - .randomDir() - .run(); - System.out.println("master " + master.getRedisServerAddressAndPort() + " started!"); - - Thread.sleep(25000); - - queue1.put(1); - assertThat(f.get()).isEqualTo(1); - + Thread.sleep(TimeUnit.SECONDS.toMillis(60)); + + queue1.put(123); + + // check connection rotation + for (int i = 0; i < 10; i++) { + queue1.put(i + 10000); + } + assertThat(queue1.size()).isEqualTo(10); + + Integer result = f.get(80, TimeUnit.SECONDS); + assertThat(result).isEqualTo(123); + redisson.shutdown(); sentinel1.stop(); sentinel2.stop(); @@ -244,8 +295,9 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest { master.stop(); slave1.stop(); slave2.stop(); + } - + @Test public void testTakeReattach() throws Exception { RedisProcess runner = new RedisRunner() @@ -257,6 +309,7 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest { Config config = new Config(); config.useSingleServer().setAddress(runner.getRedisServerAddressAndPort()); RedissonClient redisson = Redisson.create(config); + RBlockingQueue queue1 = getQueue(redisson); RFuture f = queue1.takeAsync(); f.await(1, TimeUnit.SECONDS);