From 9ca5977758583bb757a47cbe41fb87e16f131622 Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 7 Sep 2018 17:43:28 +0300 Subject: [PATCH] Fixed - BlockingQueue.take method doesn't work properly after failover. #1622 --- .../connection/ClientConnectionsEntry.java | 7 ++ .../redisson/connection/MasterSlaveEntry.java | 34 ++------ .../redisson/RedissonBlockingQueueTest.java | 79 +++++++++++++++++++ 3 files changed, 92 insertions(+), 28 deletions(-) diff --git a/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java b/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java index 051b2e8fe..0aac16670 100644 --- a/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java +++ b/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java @@ -46,6 +46,7 @@ public class ClientConnectionsEntry { private final Queue freeSubscribeConnections = new ConcurrentLinkedQueue(); private final AsyncSemaphore freeSubscribeConnectionsCounter; + private final Queue allConnections = new ConcurrentLinkedQueue(); private final Queue freeConnections = new ConcurrentLinkedQueue(); private final AsyncSemaphore freeConnectionsCounter; @@ -167,6 +168,8 @@ public class ClientConnectionsEntry { RedisConnection conn = future.getNow(); onConnect(conn); log.debug("new connection created: {}", conn); + + allConnections.add(conn); } }); return future; @@ -215,6 +218,10 @@ public class ClientConnectionsEntry { }); return future; } + + public Queue getAllConnections() { + return allConnections; + } public Queue getAllSubscribeConnections() { return allSubscribeConnections; diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 8a9ad5665..fd8a99849 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -196,9 +196,13 @@ public class MasterSlaveEntry { entry.reset(); - closeConnections(entry); + for (RedisConnection connection : entry.getAllConnections()) { + connection.closeAsync(); + reattachBlockingQueue(connection); + } for (RedisPubSubConnection connection : entry.getAllSubscribeConnections()) { + connection.closeAsync(); connectionManager.getSubscribeService().reattachPubSub(connection); } entry.getAllSubscribeConnections().clear(); @@ -206,32 +210,6 @@ public class MasterSlaveEntry { return true; } - private void closeConnections(ClientConnectionsEntry entry) { - // close all connections - while (true) { - final RedisConnection connection = entry.pollConnection(); - if (connection == null) { - break; - } - - connection.closeAsync().addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - reattachBlockingQueue(connection); - } - }); - } - - // close all pub/sub connections - while (true) { - RedisPubSubConnection connection = entry.pollSubscribeConnection(); - if (connection == null) { - break; - } - connection.closeAsync(); - } - } - private void reattachBlockingQueue(RedisConnection connection) { final CommandData commandData = connection.getCurrentCommand(); @@ -251,7 +229,7 @@ public class MasterSlaveEntry { } final RedisConnection newConnection = future.getNow(); - + final FutureListener listener = new FutureListener() { @Override public void operationComplete(Future future) throws Exception { diff --git a/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java b/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java index 2743521b0..9247be434 100644 --- a/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java @@ -22,6 +22,7 @@ import org.redisson.api.RBlockingQueue; import org.redisson.api.RFuture; import org.redisson.api.RedissonClient; import org.redisson.config.Config; +import org.redisson.connection.balancer.RandomLoadBalancer; public class RedissonBlockingQueueTest extends RedissonQueueTest { @@ -162,6 +163,84 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest { runner.stop(); } + @Test + public void testFailoverInSentinel() throws Exception { + RedisRunner.RedisProcess master = new RedisRunner() + .nosave() + .randomDir() + .run(); + RedisRunner.RedisProcess slave1 = new RedisRunner() + .port(6380) + .nosave() + .randomDir() + .slaveof("127.0.0.1", 6379) + .run(); + RedisRunner.RedisProcess slave2 = new RedisRunner() + .port(6381) + .nosave() + .randomDir() + .slaveof("127.0.0.1", 6379) + .run(); + RedisRunner.RedisProcess sentinel1 = new RedisRunner() + .nosave() + .randomDir() + .port(26379) + .sentinel() + .sentinelMonitor("myMaster", "127.0.0.1", 6379, 2) + .run(); + RedisRunner.RedisProcess sentinel2 = new RedisRunner() + .nosave() + .randomDir() + .port(26380) + .sentinel() + .sentinelMonitor("myMaster", "127.0.0.1", 6379, 2) + .run(); + RedisRunner.RedisProcess sentinel3 = new RedisRunner() + .nosave() + .randomDir() + .port(26381) + .sentinel() + .sentinelMonitor("myMaster", "127.0.0.1", 6379, 2) + .run(); + + Thread.sleep(5000); + + Config config = new Config(); + config.useSentinelServers() + .setLoadBalancer(new RandomLoadBalancer()) + .addSentinelAddress(sentinel3.getRedisServerAddressAndPort()).setMasterName("myMaster"); + RedissonClient redisson = Redisson.create(config); + + RBlockingQueue queue1 = getQueue(redisson); + RFuture f = queue1.takeAsync(); + f.await(1, TimeUnit.SECONDS); + + master.stop(); + System.out.println("master " + master.getRedisServerAddressAndPort() + " stopped!"); + + Thread.sleep(TimeUnit.SECONDS.toMillis(70)); + + master = new RedisRunner() + .port(master.getRedisServerPort()) + .nosave() + .randomDir() + .run(); + + System.out.println("master " + master.getRedisServerAddressAndPort() + " started!"); + + Thread.sleep(25000); + + queue1.put(1); + assertThat(f.get()).isEqualTo(1); + + redisson.shutdown(); + sentinel1.stop(); + sentinel2.stop(); + sentinel3.stop(); + master.stop(); + slave1.stop(); + slave2.stop(); + } @Test public void testTakeReattach() throws Exception {