From d160fddd152e2e763e2d591dede54ca1f7c33830 Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 28 Sep 2018 10:30:33 +0300 Subject: [PATCH] Failover handling improvements --- .../MasterSlaveConnectionManager.java | 3 +- .../redisson/connection/MasterSlaveEntry.java | 39 ++++++++++++------- .../connection/pool/ConnectionPool.java | 34 +++++++++++++--- 3 files changed, 54 insertions(+), 22 deletions(-) diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 97d747b2c..285800204 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -506,11 +506,12 @@ public class MasterSlaveConnectionManager implements ConnectionManager { protected final void changeMaster(int slot, URI address) { final MasterSlaveEntry entry = getEntry(slot); - client2entry.remove(entry.getClient()); + final RedisClient oldClient = entry.getClient(); entry.changeMaster(address).addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (future.isSuccess()) { + client2entry.remove(oldClient); client2entry.put(entry.getClient(), entry); } } diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 4da556182..eb06ce46a 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -127,18 +127,19 @@ public class MasterSlaveEntry { @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { + client.shutdownAsync(); result.tryFailure(future.cause()); return; } - masterEntry = new ClientConnectionsEntry( - client, - config.getMasterConnectionMinimumIdleSize(), - config.getMasterConnectionPoolSize(), - config.getSubscriptionConnectionMinimumIdleSize(), - config.getSubscriptionConnectionPoolSize(), - connectionManager, - NodeType.MASTER); + masterEntry = new ClientConnectionsEntry( + client, + config.getMasterConnectionMinimumIdleSize(), + config.getMasterConnectionPoolSize(), + config.getSubscriptionConnectionMinimumIdleSize(), + config.getSubscriptionConnectionPoolSize(), + connectionManager, + NodeType.MASTER); int counter = 1; if (config.getSubscriptionMode() == SubscriptionMode.MASTER) { @@ -146,13 +147,13 @@ public class MasterSlaveEntry { } CountableListener listener = new CountableListener(result, client, counter); - RFuture writeFuture = writeConnectionPool.add(masterEntry); - writeFuture.addListener(listener); - - if (config.getSubscriptionMode() == SubscriptionMode.MASTER) { - RFuture pubSubFuture = pubSubConnectionPool.add(masterEntry); - pubSubFuture.addListener(listener); - } + RFuture writeFuture = writeConnectionPool.add(masterEntry); + writeFuture.addListener(listener); + + if (config.getSubscriptionMode() == SubscriptionMode.MASTER) { + RFuture pubSubFuture = pubSubConnectionPool.add(masterEntry); + pubSubFuture.addListener(listener); + } } }); @@ -304,6 +305,14 @@ public class MasterSlaveEntry { } } RFuture addFuture = slaveBalancer.add(entry); + addFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + client.shutdownAsync(); + } + } + }); addFuture.addListener(new TransferListener(result)); } }); diff --git a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java index ecca1bf58..6dbe5890b 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java @@ -73,7 +73,9 @@ abstract class ConnectionPool { promise.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { - entries.add(entry); + if (future.isSuccess()) { + entries.add(entry); + } } }); initConnections(entry, promise, true); @@ -119,13 +121,34 @@ abstract class ConnectionPool { public void operationComplete(Future future) throws Exception { if (future.isSuccess()) { T conn = future.getNow(); - - releaseConnection(entry, conn); + if (!initPromise.isDone()) { + releaseConnection(entry, conn); + } else { + conn.closeAsync(); + } } releaseConnection(entry); if (!future.isSuccess()) { + if (initPromise.isDone()) { + return; + } + + for (RedisConnection connection : entry.getAllConnections()) { + if (!connection.isClosed()) { + connection.closeAsync(); + } + } + entry.getAllConnections().clear(); + + for (RedisConnection connection : entry.getAllSubscribeConnections()) { + if (!connection.isClosed()) { + connection.closeAsync(); + } + } + entry.getAllSubscribeConnections().clear(); + int totalInitializedConnections = minimumIdleSize - initializedConnections.get(); String errorMsg; if (totalInitializedConnections == 0) { @@ -141,9 +164,8 @@ abstract class ConnectionPool { int value = initializedConnections.decrementAndGet(); if (value == 0) { - log.info("{} connections initialized for {}", minimumIdleSize, entry.getClient().getAddr()); - if (!initPromise.trySuccess(null)) { - throw new IllegalStateException(); + if (initPromise.trySuccess(null)) { + log.info("{} connections initialized for {}", minimumIdleSize, entry.getClient().getAddr()); } } else if (value > 0 && !initPromise.isDone()) { if (requests.incrementAndGet() <= minimumIdleSize) {