From aa228315e9cd5930f9f8d4478a7f33333c398c5e Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 19 Dec 2017 18:46:49 +0300 Subject: [PATCH] refactoring --- .../redisson/connection/MasterSlaveEntry.java | 90 ++++++++++--------- 1 file changed, 49 insertions(+), 41 deletions(-) diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 68a52c069..4023134b9 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -75,7 +75,7 @@ public class MasterSlaveEntry { final MasterPubSubConnectionPool pubSubConnectionPool; final AtomicBoolean active = new AtomicBoolean(true); - + public MasterSlaveEntry(Set slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config) { for (ClusterSlotRange clusterSlotRange : slotRanges) { for (int i = clusterSlotRange.getStartSlot(); i < clusterSlotRange.getEndSlot() + 1; i++) { @@ -89,7 +89,7 @@ public class MasterSlaveEntry { writeConnectionPool = new MasterConnectionPool(config, connectionManager, this); pubSubConnectionPool = new MasterPubSubConnectionPool(config, connectionManager, this); } - + public MasterSlaveServersConfig getConfig() { return config; } @@ -110,31 +110,40 @@ public class MasterSlaveEntry { } public RPromise setupMasterEntry(URI address) { + final RPromise result = new RedissonPromise(); + RedisClient client = connectionManager.createClient(NodeType.MASTER, address); - masterEntry = new ClientConnectionsEntry( - client, - config.getMasterConnectionMinimumIdleSize(), - config.getMasterConnectionPoolSize(), - config.getSubscriptionConnectionMinimumIdleSize(), - config.getSubscriptionConnectionPoolSize(), - connectionManager, - NodeType.MASTER); - - RPromise result = new RedissonPromise(); - CountableListener listener = new CountableListener(result, client); RFuture addrFuture = client.resolveAddr(); - listener.incCounter(); - addrFuture.addListener(listener); - - RFuture writeFuture = writeConnectionPool.add(masterEntry); - listener.incCounter(); - writeFuture.addListener(listener); - - if (config.getSubscriptionMode() == SubscriptionMode.MASTER) { - RFuture pubSubFuture = pubSubConnectionPool.add(masterEntry); - listener.incCounter(); - pubSubFuture.addListener(listener); - } + addrFuture.addListener(new FutureListener() { + + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + result.tryFailure(future.cause()); + return; + } + + masterEntry = new ClientConnectionsEntry( + client, + config.getMasterConnectionMinimumIdleSize(), + config.getMasterConnectionPoolSize(), + config.getSubscriptionConnectionMinimumIdleSize(), + config.getSubscriptionConnectionPoolSize(), + connectionManager, + NodeType.MASTER); + + CountableListener listener = new CountableListener(result, client); + RFuture writeFuture = writeConnectionPool.add(masterEntry); + listener.incCounter(); + writeFuture.addListener(listener); + + if (config.getSubscriptionMode() == SubscriptionMode.MASTER) { + RFuture pubSubFuture = pubSubConnectionPool.add(masterEntry); + listener.incCounter(); + pubSubFuture.addListener(listener); + } + } + }); return result; } @@ -185,7 +194,7 @@ public class MasterSlaveEntry { if (connection == null) { break; } - + connection.closeAsync().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { @@ -224,7 +233,6 @@ public class MasterSlaveEntry { return; } - System.out.println("channelName " + channelName + " resubscribed!"); subscribeCodec.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -335,7 +343,7 @@ public class MasterSlaveEntry { public boolean hasSlave(RedisClient redisClient) { return slaveBalancer.contains(redisClient); } - + public boolean hasSlave(URI addr) { return slaveBalancer.contains(addr); } @@ -344,19 +352,8 @@ public class MasterSlaveEntry { return addSlave(address, false, NodeType.SLAVE); } - private RFuture addSlave(URI address, boolean freezed, NodeType nodeType) { + private RFuture addSlave(URI address, final boolean freezed, final NodeType nodeType) { RedisClient client = connectionManager.createClient(NodeType.SLAVE, address); - final ClientConnectionsEntry entry = new ClientConnectionsEntry(client, - this.config.getSlaveConnectionMinimumIdleSize(), - this.config.getSlaveConnectionPoolSize(), - this.config.getSubscriptionConnectionMinimumIdleSize(), - this.config.getSubscriptionConnectionPoolSize(), connectionManager, nodeType); - if (freezed) { - synchronized (entry) { - entry.setFreezed(freezed); - entry.setFreezeReason(FreezeReason.SYSTEM); - } - } final RPromise result = new RedissonPromise(); RFuture addrFuture = client.resolveAddr(); @@ -367,7 +364,18 @@ public class MasterSlaveEntry { result.tryFailure(future.cause()); return; } - + + ClientConnectionsEntry entry = new ClientConnectionsEntry(client, + config.getSlaveConnectionMinimumIdleSize(), + config.getSlaveConnectionPoolSize(), + config.getSubscriptionConnectionMinimumIdleSize(), + config.getSubscriptionConnectionPoolSize(), connectionManager, nodeType); + if (freezed) { + synchronized (entry) { + entry.setFreezed(freezed); + entry.setFreezeReason(FreezeReason.SYSTEM); + } + } RFuture addFuture = slaveBalancer.add(entry); addFuture.addListener(new TransferListener(result)); }