diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index 4ed2cba4d..20d1f28be 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -582,7 +582,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { for (Integer slot : removedSlots) { MasterSlaveEntry entry = removeEntry(slot); if (entry.getSlotRanges().isEmpty()) { - entry.shutdownMasterAsync(); + entry.shutdownAsync(); log.info("{} master and slaves for it removed", entry.getClient().getAddr()); } } diff --git a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java index 4de8b004d..21e5e47fc 100644 --- a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java @@ -67,8 +67,6 @@ public interface ConnectionManager { IdleConnectionWatcher getConnectionWatcher(); - void shutdownAsync(RedisClient client); - int calcSlot(String key); MasterSlaveServersConfig getConfig(); diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 622744c9a..7dc2f0b19 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -46,11 +46,11 @@ import org.redisson.config.BaseMasterSlaveServersConfig; import org.redisson.config.Config; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.TransportMode; +import org.redisson.misc.CountableListener; import org.redisson.misc.InfinitySemaphoreLatch; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; import org.redisson.misc.URIBuilder; -import org.redisson.pubsub.AsyncSemaphore; import org.redisson.pubsub.PublishSubscribeService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -435,11 +435,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return client; } - @Override - public void shutdownAsync(RedisClient client) { - client.shutdownAsync(); - } - @Override public RedisClient createClient(NodeType type, URI address, int timeout, int commandTimeout, String sslHostname) { RedisClientConfig redisConfig = createRedisConfig(type, address, timeout, commandTimeout, sslHostname); @@ -632,17 +627,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { if (dnsMonitor != null) { dnsMonitor.stop(); } + resolverGroup.close(); - timer.stop(); - - shutdownLatch.close(); - shutdownPromise.trySuccess(true); - shutdownLatch.awaitUninterruptibly(); - - for (MasterSlaveEntry entry : getEntrySet()) { - entry.shutdown(); - } - if (cfg.getExecutor() == null) { executor.shutdown(); try { @@ -651,8 +637,20 @@ public class MasterSlaveConnectionManager implements ConnectionManager { Thread.currentThread().interrupt(); } } + + timer.stop(); - resolverGroup.close(); + shutdownLatch.close(); + shutdownPromise.trySuccess(true); + shutdownLatch.awaitUninterruptibly(); + + RPromise result = new RedissonPromise(); + CountableListener listener = new CountableListener(result, null, getEntrySet().size()); + for (MasterSlaveEntry entry : getEntrySet()) { + entry.shutdownAsync().addListener(listener); + } + + result.awaitUninterruptibly(timeout, unit); if (cfg.getEventLoopGroup() == null) { group.shutdownGracefully(quietPeriod, timeout, unit).syncUninterruptibly(); diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 304294bec..c3ddcff21 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -139,25 +139,28 @@ public class MasterSlaveEntry { return; } - masterEntry = new ClientConnectionsEntry( - client, - config.getMasterConnectionMinimumIdleSize(), - config.getMasterConnectionPoolSize(), - config.getSubscriptionConnectionMinimumIdleSize(), - config.getSubscriptionConnectionPoolSize(), - connectionManager, - NodeType.MASTER); - - CountableListener listener = new CountableListener(result, client); - RFuture writeFuture = writeConnectionPool.add(masterEntry); - listener.incCounter(); - writeFuture.addListener(listener); - - if (config.getSubscriptionMode() == SubscriptionMode.MASTER) { - RFuture pubSubFuture = pubSubConnectionPool.add(masterEntry); - listener.incCounter(); - pubSubFuture.addListener(listener); - } + masterEntry = new ClientConnectionsEntry( + client, + config.getMasterConnectionMinimumIdleSize(), + config.getMasterConnectionPoolSize(), + config.getSubscriptionConnectionMinimumIdleSize(), + config.getSubscriptionConnectionPoolSize(), + connectionManager, + NodeType.MASTER); + + int counter = 1; + if (config.getSubscriptionMode() == SubscriptionMode.MASTER) { + counter++; + } + + CountableListener listener = new CountableListener(result, client, counter); + RFuture writeFuture = writeConnectionPool.add(masterEntry); + writeFuture.addListener(listener); + + if (config.getSubscriptionMode() == SubscriptionMode.MASTER) { + RFuture pubSubFuture = pubSubConnectionPool.add(masterEntry); + pubSubFuture.addListener(listener); + } } }); @@ -465,19 +468,22 @@ public class MasterSlaveEntry { && slaveBalancer.getAvailableClients() > 1) { slaveDown(newMasterClient.getAddr(), FreezeReason.SYSTEM); } - connectionManager.shutdownAsync(oldMaster.getClient()); + oldMaster.getClient().shutdownAsync(); log.info("master {} has changed to {}", oldMaster.getClient().getAddr(), masterEntry.getClient().getAddr()); } }); } - public void shutdownMasterAsync() { + public RFuture shutdownAsync() { if (!active.compareAndSet(true, false)) { - return; + return RedissonPromise.newSucceededFuture(null); } - connectionManager.shutdownAsync(masterEntry.getClient()); - slaveBalancer.shutdownAsync(); + RPromise result = new RedissonPromise(); + CountableListener listener = new CountableListener(result, null, 2); + masterEntry.getClient().shutdownAsync().addListener(listener); + slaveBalancer.shutdownAsync().addListener(listener); + return result; } public RFuture connectionWriteOp(RedisCommand command) { @@ -526,15 +532,6 @@ public class MasterSlaveEntry { slaveBalancer.returnConnection(connection); } - public void shutdown() { - if (!active.compareAndSet(true, false)) { - return; - } - - masterEntry.getClient().shutdown(); - slaveBalancer.shutdown(); - } - public void addSlotRange(Integer range) { slots.add(range); } diff --git a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java index af5f51597..a83b7cd99 100644 --- a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java +++ b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java @@ -83,19 +83,17 @@ public class LoadBalancerManager { public RFuture add(final ClientConnectionsEntry entry) { RPromise result = new RedissonPromise(); - CountableListener listener = new CountableListener(result, null) { + CountableListener listener = new CountableListener(result, null, 2) { @Override protected void onSuccess(Void value) { - client2Entry.put(entry.getClient(), entry); - } + client2Entry.put(entry.getClient(), entry); + } }; RFuture slaveFuture = slaveConnectionPool.add(entry); - listener.incCounter(); slaveFuture.addListener(listener); RFuture pubSubFuture = pubSubConnectionPool.add(entry); - listener.incCounter(); pubSubFuture.addListener(listener); return result; } @@ -249,16 +247,16 @@ public class LoadBalancerManager { slaveConnectionPool.returnConnection(entry, connection); } - public void shutdown() { - for (ClientConnectionsEntry entry : client2Entry.values()) { - entry.getClient().shutdown(); + public RFuture shutdownAsync() { + if (client2Entry.values().isEmpty()) { + return RedissonPromise.newSucceededFuture(null); } - } - - public void shutdownAsync() { + RPromise result = new RedissonPromise(); + CountableListener listener = new CountableListener(result, null, client2Entry.values().size()); for (ClientConnectionsEntry entry : client2Entry.values()) { - connectionManager.shutdownAsync(entry.getClient()); + entry.getClient().shutdownAsync().addListener(listener); } + return result; } } diff --git a/redisson/src/main/java/org/redisson/misc/CountableListener.java b/redisson/src/main/java/org/redisson/misc/CountableListener.java index ade47d5ac..81a4695d9 100644 --- a/redisson/src/main/java/org/redisson/misc/CountableListener.java +++ b/redisson/src/main/java/org/redisson/misc/CountableListener.java @@ -36,19 +36,19 @@ public class CountableListener implements FutureListener { } public CountableListener(RPromise result, T value) { - super(); + this(null, null, 0); + } + + public CountableListener(RPromise result, T value, int count) { this.result = result; this.value = value; + this.counter.set(count); } public void setCounter(int newValue) { counter.set(newValue); } - public void incCounter() { - counter.incrementAndGet(); - } - public void decCounter() { if (counter.decrementAndGet() == 0) { onSuccess(value); diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java index 31b50c692..34b55a997 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java @@ -470,11 +470,9 @@ public class RedissonTransaction implements RTransaction { } final CountableListener> listener = - new CountableListener>(result, hashes); - listener.setCounter(hashes.size()); + new CountableListener>(result, hashes, hashes.size()); RPromise subscriptionFuture = new RedissonPromise(); - final CountableListener subscribedFutures = new CountableListener(subscriptionFuture, null); - subscribedFutures.setCounter(hashes.size()); + final CountableListener subscribedFutures = new CountableListener(subscriptionFuture, null, hashes.size()); final List> topics = new ArrayList>(); for (final Entry entry : hashes.entrySet()) {