From ce060ef6513795cce3da62f5cee5febfbb89aced Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 26 Nov 2015 15:59:18 +0300 Subject: [PATCH] Waiting for connection pool init at startup --- .../cluster/ClusterConnectionManager.java | 39 +++++++++++++------ .../MasterSlaveConnectionManager.java | 12 +++++- .../redisson/connection/MasterSlaveEntry.java | 29 ++++++++------ .../connection/SingleConnectionManager.java | 4 +- .../org/redisson/connection/SingleEntry.java | 26 +++++++++---- .../balancer/LoadBalancerManager.java | 2 +- .../balancer/LoadBalancerManagerImpl.java | 15 +++++-- .../org/redisson/misc/ConnectionPool.java | 10 ++++- 8 files changed, 98 insertions(+), 39 deletions(-) diff --git a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index 0db04584c..c8c49fa8b 100644 --- a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -18,6 +18,7 @@ package org.redisson.cluster; import java.net.URI; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -40,6 +41,8 @@ import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.ScheduledFuture; @@ -70,7 +73,10 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { Collection partitions = parsePartitions(nodesValue); for (ClusterPartition partition : partitions) { - addMasterEntry(partition, cfg); + Collection> s = addMasterEntry(partition, cfg); + for (Future future : s) { + future.syncUninterruptibly(); + } } break; @@ -108,20 +114,23 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { protected void initEntry(MasterSlaveServersConfig config) { } - private void addMasterEntry(ClusterPartition partition, ClusterServersConfig cfg) { + private Collection> addMasterEntry(final ClusterPartition partition, ClusterServersConfig cfg) { if (partition.isMasterFail()) { log.warn("add master: {} for slot ranges: {} failed. Reason - server has FAIL flag", partition.getMasterAddress(), partition.getSlotRanges()); - return; + Future f = newSucceededFuture(); + return Collections.singletonList(f); } RedisConnection connection = connect(cfg, partition.getMasterAddress()); if (connection == null) { - return; + Future f = newSucceededFuture(); + return Collections.singletonList(f); } Map params = connection.sync(RedisCommands.CLUSTER_INFO); if ("fail".equals(params.get("cluster_state"))) { log.warn("add master: {} for slot ranges: {} failed. Reason - cluster_state:fail", partition.getMasterAddress(), partition.getSlotRanges()); - return; + Future f = newSucceededFuture(); + return Collections.singletonList(f); } MasterSlaveServersConfig config = create(cfg); @@ -131,12 +140,20 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { log.info("slaves: {} added for slot ranges: {}", partition.getSlaveAddresses(), partition.getSlotRanges()); - MasterSlaveEntry entry = new MasterSlaveEntry(partition.getSlotRanges(), this, config, connectListener); - entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); - for (ClusterSlotRange slotRange : partition.getSlotRanges()) { - addEntry(slotRange, entry); - lastPartitions.put(slotRange, partition); - } + final MasterSlaveEntry entry = new MasterSlaveEntry(partition.getSlotRanges(), this, config, connectListener); + List> fs = entry.initSlaveBalancer(config); + Future f = entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); + f.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + for (ClusterSlotRange slotRange : partition.getSlotRanges()) { + addEntry(slotRange, entry); + lastPartitions.put(slotRange, partition); + } + } + }); + fs.add(f); + return fs; } private void monitorClusterChange(final ClusterServersConfig cfg) { diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 552e6abc1..4cd513cc9 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -20,6 +20,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; @@ -172,7 +173,12 @@ public class MasterSlaveConnectionManager implements ConnectionManager { HashSet slots = new HashSet(); slots.add(singleSlotRange); MasterSlaveEntry entry = new MasterSlaveEntry(slots, this, config, connectListener); - entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); + List> fs = entry.initSlaveBalancer(config); + Future f = entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); + fs.add(f); + for (Future future : fs) { + future.syncUninterruptibly(); + } addEntry(singleSlotRange, entry); } @@ -677,6 +683,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return group.next().newPromise(); } + public Future newSucceededFuture() { + return group.next().newSucceededFuture(null); + } + @Override public Future newFailedFuture(Throwable cause) { return group.next().newFailedFuture(cause); diff --git a/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 1f6f966d8..0f4d59b44 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -17,7 +17,10 @@ package org.redisson.connection; import java.net.InetSocketAddress; import java.net.URI; +import java.util.ArrayList; import java.util.Collection; +import java.util.LinkedList; +import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -66,25 +69,27 @@ public class MasterSlaveEntry { this.connectListener = connectListener; slaveBalancer = new LoadBalancerManagerImpl(config, connectionManager, this); - - initSlaveBalancer(config); - writeConnectionHolder = new MasterConnectionPool(config, connectionManager, this); } - protected void initSlaveBalancer(MasterSlaveServersConfig config) { + public List> initSlaveBalancer(MasterSlaveServersConfig config) { boolean freezeMasterAsSlave = !config.getSlaveAddresses().isEmpty(); - addSlave(config.getMasterAddress().getHost(), config.getMasterAddress().getPort(), freezeMasterAsSlave, NodeType.MASTER); + + List> result = new LinkedList>(); + Future f = addSlave(config.getMasterAddress().getHost(), config.getMasterAddress().getPort(), freezeMasterAsSlave, NodeType.MASTER); + result.add(f); for (URI address : config.getSlaveAddresses()) { - addSlave(address.getHost(), address.getPort(), false, NodeType.SLAVE); + f = addSlave(address.getHost(), address.getPort(), false, NodeType.SLAVE); + result.add(f); } + return result; } - public void setupMasterEntry(String host, int port) { + public Future setupMasterEntry(String host, int port) { RedisClient client = connectionManager.createClient(host, port); masterEntry = new ClientConnectionsEntry(client, config.getMasterConnectionMinimumIdleSize(), config.getMasterConnectionPoolSize(), 0, 0, connectListener, NodeType.MASTER, connectionManager.getConnectionWatcher(), config); - writeConnectionHolder.add(masterEntry); + return writeConnectionHolder.add(masterEntry); } public Collection slaveDown(String host, int port, FreezeReason freezeReason) { @@ -98,11 +103,11 @@ public class MasterSlaveEntry { return conns; } - public void addSlave(String host, int port) { - addSlave(host, port, true, NodeType.SLAVE); + public Future addSlave(String host, int port) { + return addSlave(host, port, true, NodeType.SLAVE); } - private void addSlave(String host, int port, boolean freezed, NodeType mode) { + private Future addSlave(String host, int port, boolean freezed, NodeType mode) { RedisClient client = connectionManager.createClient(host, port); ClientConnectionsEntry entry = new ClientConnectionsEntry(client, this.config.getSlaveConnectionMinimumIdleSize(), @@ -113,7 +118,7 @@ public class MasterSlaveEntry { entry.setFreezed(freezed); entry.setFreezeReason(FreezeReason.SYSTEM); } - slaveBalancer.add(entry); + return slaveBalancer.add(entry); } public RedisClient getClient() { diff --git a/src/main/java/org/redisson/connection/SingleConnectionManager.java b/src/main/java/org/redisson/connection/SingleConnectionManager.java index 58d80e244..5891279cf 100644 --- a/src/main/java/org/redisson/connection/SingleConnectionManager.java +++ b/src/main/java/org/redisson/connection/SingleConnectionManager.java @@ -29,6 +29,7 @@ import org.redisson.cluster.ClusterSlotRange; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.ScheduledFuture; @@ -82,7 +83,8 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager { HashSet slots = new HashSet(); slots.add(singleSlotRange); SingleEntry entry = new SingleEntry(slots, this, config, connectListener); - entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); + Future f = entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); + f.syncUninterruptibly(); addEntry(singleSlotRange, entry); } diff --git a/src/main/java/org/redisson/connection/SingleEntry.java b/src/main/java/org/redisson/connection/SingleEntry.java index b615036b8..ce5097ee6 100644 --- a/src/main/java/org/redisson/connection/SingleEntry.java +++ b/src/main/java/org/redisson/connection/SingleEntry.java @@ -17,6 +17,7 @@ package org.redisson.connection; import java.net.InetSocketAddress; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import org.redisson.MasterSlaveServersConfig; import org.redisson.client.RedisClient; @@ -28,6 +29,8 @@ import org.redisson.misc.ConnectionPool; import org.redisson.misc.PubSubConnectionPoll; import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.Promise; public class SingleEntry extends MasterSlaveEntry { @@ -43,19 +46,28 @@ public class SingleEntry extends MasterSlaveEntry { } @Override - public void setupMasterEntry(String host, int port) { + public Future setupMasterEntry(String host, int port) { RedisClient masterClient = connectionManager.createClient(host, port); masterEntry = new ClientConnectionsEntry(masterClient, config.getMasterConnectionMinimumIdleSize(), config.getMasterConnectionPoolSize(), config.getSlaveConnectionMinimumIdleSize(), config.getSlaveSubscriptionConnectionPoolSize(), connectListener, NodeType.MASTER, connectionManager.getConnectionWatcher(), config); - writeConnectionHolder.add(masterEntry); - pubSubConnectionHolder.add(masterEntry); - } - - @Override - protected void initSlaveBalancer(MasterSlaveServersConfig config) { + final Promise res = connectionManager.newPromise(); + Future f = writeConnectionHolder.add(masterEntry); + Future s = pubSubConnectionHolder.add(masterEntry); + FutureListener listener = new FutureListener() { + AtomicInteger counter = new AtomicInteger(2); + @Override + public void operationComplete(Future future) throws Exception { + if (counter.decrementAndGet() == 0) { + res.setSuccess(null); + } + } + }; + f.addListener(listener); + s.addListener(listener); + return res; } @Override diff --git a/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java b/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java index 982ef5583..ab0688535 100644 --- a/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java +++ b/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java @@ -39,7 +39,7 @@ public interface LoadBalancerManager { Collection freeze(String host, int port, FreezeReason freezeReason); - void add(ClientConnectionsEntry entry); + Future add(ClientConnectionsEntry entry); Future nextConnection(); diff --git a/src/main/java/org/redisson/connection/balancer/LoadBalancerManagerImpl.java b/src/main/java/org/redisson/connection/balancer/LoadBalancerManagerImpl.java index 0c49710a9..55a4ce744 100644 --- a/src/main/java/org/redisson/connection/balancer/LoadBalancerManagerImpl.java +++ b/src/main/java/org/redisson/connection/balancer/LoadBalancerManagerImpl.java @@ -36,6 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; import io.netty.util.internal.PlatformDependent; public class LoadBalancerManagerImpl implements LoadBalancerManager { @@ -53,10 +54,16 @@ public class LoadBalancerManagerImpl implements LoadBalancerManager { pubSubEntries = new PubSubConnectionPoll(config, connectionManager, entry); } - public synchronized void add(ClientConnectionsEntry entry) { - addr2Entry.put(entry.getClient().getAddr(), entry); - entries.add(entry); - pubSubEntries.add(entry); + public Future add(final ClientConnectionsEntry entry) { + Future f = entries.add(entry); + f.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + addr2Entry.put(entry.getClient().getAddr(), entry); + pubSubEntries.add(entry); + } + }); + return f; } public int getAvailableClients() { diff --git a/src/main/java/org/redisson/misc/ConnectionPool.java b/src/main/java/org/redisson/misc/ConnectionPool.java index b9e27a6b0..e2888b1bd 100644 --- a/src/main/java/org/redisson/misc/ConnectionPool.java +++ b/src/main/java/org/redisson/misc/ConnectionPool.java @@ -52,19 +52,22 @@ public class ConnectionPool { this.connectionManager = connectionManager; } - public void add(final ClientConnectionsEntry entry) { + public Future add(final ClientConnectionsEntry entry) { + final Promise promise = connectionManager.newPromise(); initConnections(entry, new Runnable() { @Override public void run() { entries.add(entry); + promise.setSuccess(null); } }, true); + return promise; } private void initConnections(final ClientConnectionsEntry entry, final Runnable runnable, boolean checkFreezed) { int minimumIdleSize = getMinimumIdleSize(entry); - if (minimumIdleSize == 0) { + if (minimumIdleSize == 0 || (checkFreezed && entry.isFreezed())) { runnable.run(); return; } @@ -72,6 +75,9 @@ public class ConnectionPool { final AtomicInteger completedConnections = new AtomicInteger(minimumIdleSize); for (int i = 0; i < minimumIdleSize; i++) { if ((checkFreezed && entry.isFreezed()) || !tryAcquireConnection(entry)) { + if (completedConnections.decrementAndGet() == 0) { + runnable.run(); + } continue; }