Waiting for connection pool init at startup

pull/303/head
Nikita 9 years ago
parent 531a753f57
commit ce060ef651

@ -18,6 +18,7 @@ package org.redisson.cluster;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -40,6 +41,8 @@ import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.ScheduledFuture;
@ -70,7 +73,10 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
Collection<ClusterPartition> partitions = parsePartitions(nodesValue);
for (ClusterPartition partition : partitions) {
addMasterEntry(partition, cfg);
Collection<Future<Void>> s = addMasterEntry(partition, cfg);
for (Future<Void> future : s) {
future.syncUninterruptibly();
}
}
break;
@ -108,20 +114,23 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
protected void initEntry(MasterSlaveServersConfig config) {
}
private void addMasterEntry(ClusterPartition partition, ClusterServersConfig cfg) {
private Collection<Future<Void>> addMasterEntry(final ClusterPartition partition, ClusterServersConfig cfg) {
if (partition.isMasterFail()) {
log.warn("add master: {} for slot ranges: {} failed. Reason - server has FAIL flag", partition.getMasterAddress(), partition.getSlotRanges());
return;
Future<Void> f = newSucceededFuture();
return Collections.singletonList(f);
}
RedisConnection connection = connect(cfg, partition.getMasterAddress());
if (connection == null) {
return;
Future<Void> f = newSucceededFuture();
return Collections.singletonList(f);
}
Map<String, String> params = connection.sync(RedisCommands.CLUSTER_INFO);
if ("fail".equals(params.get("cluster_state"))) {
log.warn("add master: {} for slot ranges: {} failed. Reason - cluster_state:fail", partition.getMasterAddress(), partition.getSlotRanges());
return;
Future<Void> f = newSucceededFuture();
return Collections.singletonList(f);
}
MasterSlaveServersConfig config = create(cfg);
@ -131,12 +140,20 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
log.info("slaves: {} added for slot ranges: {}", partition.getSlaveAddresses(), partition.getSlotRanges());
MasterSlaveEntry entry = new MasterSlaveEntry(partition.getSlotRanges(), this, config, connectListener);
entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
for (ClusterSlotRange slotRange : partition.getSlotRanges()) {
addEntry(slotRange, entry);
lastPartitions.put(slotRange, partition);
}
final MasterSlaveEntry entry = new MasterSlaveEntry(partition.getSlotRanges(), this, config, connectListener);
List<Future<Void>> fs = entry.initSlaveBalancer(config);
Future<Void> f = entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
f.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
for (ClusterSlotRange slotRange : partition.getSlotRanges()) {
addEntry(slotRange, entry);
lastPartitions.put(slotRange, partition);
}
}
});
fs.add(f);
return fs;
}
private void monitorClusterChange(final ClusterServersConfig cfg) {

@ -20,6 +20,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@ -172,7 +173,12 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
HashSet<ClusterSlotRange> slots = new HashSet<ClusterSlotRange>();
slots.add(singleSlotRange);
MasterSlaveEntry entry = new MasterSlaveEntry(slots, this, config, connectListener);
entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
List<Future<Void>> fs = entry.initSlaveBalancer(config);
Future<Void> f = entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
fs.add(f);
for (Future<Void> future : fs) {
future.syncUninterruptibly();
}
addEntry(singleSlotRange, entry);
}
@ -677,6 +683,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return group.next().newPromise();
}
public <R> Future<R> newSucceededFuture() {
return group.next().newSucceededFuture(null);
}
@Override
public <R> Future<R> newFailedFuture(Throwable cause) {
return group.next().newFailedFuture(cause);

@ -17,7 +17,10 @@ package org.redisson.connection;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
@ -66,25 +69,27 @@ public class MasterSlaveEntry {
this.connectListener = connectListener;
slaveBalancer = new LoadBalancerManagerImpl(config, connectionManager, this);
initSlaveBalancer(config);
writeConnectionHolder = new MasterConnectionPool(config, connectionManager, this);
}
protected void initSlaveBalancer(MasterSlaveServersConfig config) {
public List<Future<Void>> initSlaveBalancer(MasterSlaveServersConfig config) {
boolean freezeMasterAsSlave = !config.getSlaveAddresses().isEmpty();
addSlave(config.getMasterAddress().getHost(), config.getMasterAddress().getPort(), freezeMasterAsSlave, NodeType.MASTER);
List<Future<Void>> result = new LinkedList<Future<Void>>();
Future<Void> f = addSlave(config.getMasterAddress().getHost(), config.getMasterAddress().getPort(), freezeMasterAsSlave, NodeType.MASTER);
result.add(f);
for (URI address : config.getSlaveAddresses()) {
addSlave(address.getHost(), address.getPort(), false, NodeType.SLAVE);
f = addSlave(address.getHost(), address.getPort(), false, NodeType.SLAVE);
result.add(f);
}
return result;
}
public void setupMasterEntry(String host, int port) {
public Future<Void> setupMasterEntry(String host, int port) {
RedisClient client = connectionManager.createClient(host, port);
masterEntry = new ClientConnectionsEntry(client, config.getMasterConnectionMinimumIdleSize(), config.getMasterConnectionPoolSize(),
0, 0, connectListener, NodeType.MASTER, connectionManager.getConnectionWatcher(), config);
writeConnectionHolder.add(masterEntry);
return writeConnectionHolder.add(masterEntry);
}
public Collection<RedisPubSubConnection> slaveDown(String host, int port, FreezeReason freezeReason) {
@ -98,11 +103,11 @@ public class MasterSlaveEntry {
return conns;
}
public void addSlave(String host, int port) {
addSlave(host, port, true, NodeType.SLAVE);
public Future<Void> addSlave(String host, int port) {
return addSlave(host, port, true, NodeType.SLAVE);
}
private void addSlave(String host, int port, boolean freezed, NodeType mode) {
private Future<Void> addSlave(String host, int port, boolean freezed, NodeType mode) {
RedisClient client = connectionManager.createClient(host, port);
ClientConnectionsEntry entry = new ClientConnectionsEntry(client,
this.config.getSlaveConnectionMinimumIdleSize(),
@ -113,7 +118,7 @@ public class MasterSlaveEntry {
entry.setFreezed(freezed);
entry.setFreezeReason(FreezeReason.SYSTEM);
}
slaveBalancer.add(entry);
return slaveBalancer.add(entry);
}
public RedisClient getClient() {

@ -29,6 +29,7 @@ import org.redisson.cluster.ClusterSlotRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.ScheduledFuture;
@ -82,7 +83,8 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager {
HashSet<ClusterSlotRange> slots = new HashSet<ClusterSlotRange>();
slots.add(singleSlotRange);
SingleEntry entry = new SingleEntry(slots, this, config, connectListener);
entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
Future<Void> f = entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
f.syncUninterruptibly();
addEntry(singleSlotRange, entry);
}

@ -17,6 +17,7 @@ package org.redisson.connection;
import java.net.InetSocketAddress;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisClient;
@ -28,6 +29,8 @@ import org.redisson.misc.ConnectionPool;
import org.redisson.misc.PubSubConnectionPoll;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
public class SingleEntry extends MasterSlaveEntry {
@ -43,19 +46,28 @@ public class SingleEntry extends MasterSlaveEntry {
}
@Override
public void setupMasterEntry(String host, int port) {
public Future<Void> setupMasterEntry(String host, int port) {
RedisClient masterClient = connectionManager.createClient(host, port);
masterEntry = new ClientConnectionsEntry(masterClient,
config.getMasterConnectionMinimumIdleSize(),
config.getMasterConnectionPoolSize(),
config.getSlaveConnectionMinimumIdleSize(),
config.getSlaveSubscriptionConnectionPoolSize(), connectListener, NodeType.MASTER, connectionManager.getConnectionWatcher(), config);
writeConnectionHolder.add(masterEntry);
pubSubConnectionHolder.add(masterEntry);
}
@Override
protected void initSlaveBalancer(MasterSlaveServersConfig config) {
final Promise<Void> res = connectionManager.newPromise();
Future<Void> f = writeConnectionHolder.add(masterEntry);
Future<Void> s = pubSubConnectionHolder.add(masterEntry);
FutureListener<Void> listener = new FutureListener<Void>() {
AtomicInteger counter = new AtomicInteger(2);
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (counter.decrementAndGet() == 0) {
res.setSuccess(null);
}
}
};
f.addListener(listener);
s.addListener(listener);
return res;
}
@Override

@ -39,7 +39,7 @@ public interface LoadBalancerManager {
Collection<RedisPubSubConnection> freeze(String host, int port, FreezeReason freezeReason);
void add(ClientConnectionsEntry entry);
Future<Void> add(ClientConnectionsEntry entry);
Future<RedisConnection> nextConnection();

@ -36,6 +36,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent;
public class LoadBalancerManagerImpl implements LoadBalancerManager {
@ -53,10 +54,16 @@ public class LoadBalancerManagerImpl implements LoadBalancerManager {
pubSubEntries = new PubSubConnectionPoll(config, connectionManager, entry);
}
public synchronized void add(ClientConnectionsEntry entry) {
addr2Entry.put(entry.getClient().getAddr(), entry);
entries.add(entry);
pubSubEntries.add(entry);
public Future<Void> add(final ClientConnectionsEntry entry) {
Future<Void> f = entries.add(entry);
f.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
addr2Entry.put(entry.getClient().getAddr(), entry);
pubSubEntries.add(entry);
}
});
return f;
}
public int getAvailableClients() {

@ -52,19 +52,22 @@ public class ConnectionPool<T extends RedisConnection> {
this.connectionManager = connectionManager;
}
public void add(final ClientConnectionsEntry entry) {
public Future<Void> add(final ClientConnectionsEntry entry) {
final Promise<Void> promise = connectionManager.newPromise();
initConnections(entry, new Runnable() {
@Override
public void run() {
entries.add(entry);
promise.setSuccess(null);
}
}, true);
return promise;
}
private void initConnections(final ClientConnectionsEntry entry, final Runnable runnable, boolean checkFreezed) {
int minimumIdleSize = getMinimumIdleSize(entry);
if (minimumIdleSize == 0) {
if (minimumIdleSize == 0 || (checkFreezed && entry.isFreezed())) {
runnable.run();
return;
}
@ -72,6 +75,9 @@ public class ConnectionPool<T extends RedisConnection> {
final AtomicInteger completedConnections = new AtomicInteger(minimumIdleSize);
for (int i = 0; i < minimumIdleSize; i++) {
if ((checkFreezed && entry.isFreezed()) || !tryAcquireConnection(entry)) {
if (completedConnections.decrementAndGet() == 0) {
runnable.run();
}
continue;
}

Loading…
Cancel
Save