From 556b0b2a79e0d1d2f168fbd340b217ecf344b448 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 23 Nov 2015 10:31:18 +0300 Subject: [PATCH] SubscribesConnectionEntry renamed --- .../cluster/ClusterConnectionListener.java | 2 +- .../cluster/ClusterConnectionManager.java | 2 +- ...Entry.java => ClientConnectionsEntry.java} | 14 +++--- .../connection/ConnectionListener.java | 2 +- .../connection/ConnectionManager.java | 2 +- .../connection/DefaultConnectionListener.java | 2 +- .../org/redisson/connection/LoadBalancer.java | 2 +- .../connection/LoadBalancerManager.java | 4 +- .../connection/LoadBalancerManagerImpl.java | 22 ++++----- .../MasterSlaveConnectionManager.java | 2 +- .../redisson/connection/MasterSlaveEntry.java | 12 ++--- .../connection/RandomLoadBalancer.java | 2 +- .../connection/RoundRobinLoadBalancer.java | 2 +- .../connection/SentinelConnectionManager.java | 2 +- .../org/redisson/connection/SingleEntry.java | 6 +-- .../org/redisson/misc/ConnectionPool.java | 48 +++++++++---------- .../redisson/misc/MasterConnectionPool.java | 6 +-- .../redisson/misc/PubSubConnectionPoll.java | 14 +++--- 18 files changed, 73 insertions(+), 73 deletions(-) rename src/main/java/org/redisson/connection/{SubscribesConnectionEntry.java => ClientConnectionsEntry.java} (93%) diff --git a/src/main/java/org/redisson/cluster/ClusterConnectionListener.java b/src/main/java/org/redisson/cluster/ClusterConnectionListener.java index 83be18e83..e37f41374 100644 --- a/src/main/java/org/redisson/cluster/ClusterConnectionListener.java +++ b/src/main/java/org/redisson/cluster/ClusterConnectionListener.java @@ -20,7 +20,7 @@ import org.redisson.client.RedisException; import org.redisson.client.protocol.RedisCommands; import org.redisson.connection.DefaultConnectionListener; import org.redisson.connection.FutureConnectionListener; -import org.redisson.connection.SubscribesConnectionEntry.NodeType; +import org.redisson.connection.ClientConnectionsEntry.NodeType; public class ClusterConnectionListener extends DefaultConnectionListener { diff --git a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index ad746d19f..3b235c487 100644 --- a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -36,7 +36,7 @@ import org.redisson.cluster.ClusterNodeInfo.Flag; import org.redisson.connection.CRC16; import org.redisson.connection.MasterSlaveConnectionManager; import org.redisson.connection.MasterSlaveEntry; -import org.redisson.connection.SubscribesConnectionEntry.FreezeReason; +import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/org/redisson/connection/SubscribesConnectionEntry.java b/src/main/java/org/redisson/connection/ClientConnectionsEntry.java similarity index 93% rename from src/main/java/org/redisson/connection/SubscribesConnectionEntry.java rename to src/main/java/org/redisson/connection/ClientConnectionsEntry.java index a82efc64c..dd466122d 100644 --- a/src/main/java/org/redisson/connection/SubscribesConnectionEntry.java +++ b/src/main/java/org/redisson/connection/ClientConnectionsEntry.java @@ -31,7 +31,7 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; -public class SubscribesConnectionEntry { +public class ClientConnectionsEntry { final Logger log = LoggerFactory.getLogger(getClass()); @@ -48,17 +48,17 @@ public class SubscribesConnectionEntry { public enum NodeType {SLAVE, MASTER} private final NodeType nodeType; - private final ConnectionListener connectListener; + private final ConnectionListener connectionListener; private final Queue freeConnections = new ConcurrentLinkedQueue(); private final AtomicInteger freeConnectionsCounter = new AtomicInteger(); private final AtomicInteger failedAttempts = new AtomicInteger(); - public SubscribesConnectionEntry(RedisClient client, int poolSize, int subscribePoolSize, ConnectionListener connectListener, NodeType serverMode) { + public ClientConnectionsEntry(RedisClient client, int poolSize, int subscribePoolSize, ConnectionListener connectionListener, NodeType serverMode) { this.client = client; this.freeConnectionsCounter.set(poolSize); - this.connectListener = connectListener; + this.connectionListener = connectionListener; this.nodeType = serverMode; freeSubscribeConnectionsCounter.set(subscribePoolSize); } @@ -145,7 +145,7 @@ public class SubscribesConnectionEntry { log.debug("new connection created: {}", conn); FutureConnectionListener listener = new FutureConnectionListener(connectionFuture, conn); - connectListener.onConnect(config, nodeType, listener); + connectionListener.onConnect(config, nodeType, listener); listener.executeCommands(); addReconnectListener(config, conn); @@ -160,7 +160,7 @@ public class SubscribesConnectionEntry { @Override public void onReconnect(RedisConnection conn, Promise connectionFuture) { FutureConnectionListener listener = new FutureConnectionListener(connectionFuture, conn); - connectListener.onConnect(config, nodeType, listener); + connectionListener.onConnect(config, nodeType, listener); listener.executeCommands(); } }); @@ -180,7 +180,7 @@ public class SubscribesConnectionEntry { log.debug("new pubsub connection created: {}", conn); FutureConnectionListener listener = new FutureConnectionListener(connectionFuture, conn); - connectListener.onConnect(config, nodeType, listener); + connectionListener.onConnect(config, nodeType, listener); listener.executeCommands(); addReconnectListener(config, conn); diff --git a/src/main/java/org/redisson/connection/ConnectionListener.java b/src/main/java/org/redisson/connection/ConnectionListener.java index 995a477aa..addcdada2 100644 --- a/src/main/java/org/redisson/connection/ConnectionListener.java +++ b/src/main/java/org/redisson/connection/ConnectionListener.java @@ -17,7 +17,7 @@ package org.redisson.connection; import org.redisson.MasterSlaveServersConfig; import org.redisson.client.RedisException; -import org.redisson.connection.SubscribesConnectionEntry.NodeType; +import org.redisson.connection.ClientConnectionsEntry.NodeType; public interface ConnectionListener { diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index 63b961d6c..83e09ccd0 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -28,7 +28,7 @@ import org.redisson.client.RedisPubSubListener; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.cluster.ClusterSlotRange; -import org.redisson.connection.SubscribesConnectionEntry.FreezeReason; +import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.misc.InfinitySemaphoreLatch; import io.netty.channel.EventLoopGroup; diff --git a/src/main/java/org/redisson/connection/DefaultConnectionListener.java b/src/main/java/org/redisson/connection/DefaultConnectionListener.java index 6c5f9d91c..2992a8b8f 100644 --- a/src/main/java/org/redisson/connection/DefaultConnectionListener.java +++ b/src/main/java/org/redisson/connection/DefaultConnectionListener.java @@ -18,7 +18,7 @@ package org.redisson.connection; import org.redisson.MasterSlaveServersConfig; import org.redisson.client.RedisException; import org.redisson.client.protocol.RedisCommands; -import org.redisson.connection.SubscribesConnectionEntry.NodeType; +import org.redisson.connection.ClientConnectionsEntry.NodeType; public class DefaultConnectionListener implements ConnectionListener { diff --git a/src/main/java/org/redisson/connection/LoadBalancer.java b/src/main/java/org/redisson/connection/LoadBalancer.java index 9673e05f5..50c7882b7 100644 --- a/src/main/java/org/redisson/connection/LoadBalancer.java +++ b/src/main/java/org/redisson/connection/LoadBalancer.java @@ -19,6 +19,6 @@ import java.util.List; public interface LoadBalancer { - SubscribesConnectionEntry getEntry(List clientsCopy); + ClientConnectionsEntry getEntry(List clientsCopy); } diff --git a/src/main/java/org/redisson/connection/LoadBalancerManager.java b/src/main/java/org/redisson/connection/LoadBalancerManager.java index f1fc4bb74..60ee5d7a0 100644 --- a/src/main/java/org/redisson/connection/LoadBalancerManager.java +++ b/src/main/java/org/redisson/connection/LoadBalancerManager.java @@ -20,7 +20,7 @@ import java.util.Collection; import org.redisson.client.RedisConnection; import org.redisson.client.RedisPubSubConnection; -import org.redisson.connection.SubscribesConnectionEntry.FreezeReason; +import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import io.netty.util.concurrent.Future; @@ -38,7 +38,7 @@ public interface LoadBalancerManager { Collection freeze(String host, int port, FreezeReason freezeReason); - void add(SubscribesConnectionEntry entry); + void add(ClientConnectionsEntry entry); Future nextConnection(); diff --git a/src/main/java/org/redisson/connection/LoadBalancerManagerImpl.java b/src/main/java/org/redisson/connection/LoadBalancerManagerImpl.java index be5ef7db1..bfb99651a 100644 --- a/src/main/java/org/redisson/connection/LoadBalancerManagerImpl.java +++ b/src/main/java/org/redisson/connection/LoadBalancerManagerImpl.java @@ -26,7 +26,7 @@ import org.redisson.MasterSlaveServersConfig; import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisPubSubConnection; -import org.redisson.connection.SubscribesConnectionEntry.FreezeReason; +import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.misc.ConnectionPool; import org.redisson.misc.PubSubConnectionPoll; import org.slf4j.Logger; @@ -40,7 +40,7 @@ public class LoadBalancerManagerImpl implements LoadBalancerManager { private final Logger log = LoggerFactory.getLogger(getClass()); private final ConnectionManager connectionManager; - private final Map addr2Entry = PlatformDependent.newConcurrentHashMap(); + private final Map addr2Entry = PlatformDependent.newConcurrentHashMap(); private final PubSubConnectionPoll pubSubEntries; private final ConnectionPool entries; @@ -50,7 +50,7 @@ public class LoadBalancerManagerImpl implements LoadBalancerManager { pubSubEntries = new PubSubConnectionPoll(config, connectionManager, entry); } - public synchronized void add(SubscribesConnectionEntry entry) { + public synchronized void add(ClientConnectionsEntry entry) { addr2Entry.put(entry.getClient().getAddr(), entry); entries.add(entry); pubSubEntries.add(entry); @@ -58,7 +58,7 @@ public class LoadBalancerManagerImpl implements LoadBalancerManager { public int getAvailableClients() { int count = 0; - for (SubscribesConnectionEntry connectionEntry : addr2Entry.values()) { + for (ClientConnectionsEntry connectionEntry : addr2Entry.values()) { if (!connectionEntry.isFreezed()) { count++; } @@ -68,7 +68,7 @@ public class LoadBalancerManagerImpl implements LoadBalancerManager { public boolean unfreeze(String host, int port, FreezeReason freezeReason) { InetSocketAddress addr = new InetSocketAddress(host, port); - SubscribesConnectionEntry entry = addr2Entry.get(addr); + ClientConnectionsEntry entry = addr2Entry.get(addr); if (entry == null) { throw new IllegalStateException("Can't find " + addr + " in slaves!"); } @@ -90,7 +90,7 @@ public class LoadBalancerManagerImpl implements LoadBalancerManager { public Collection freeze(String host, int port, FreezeReason freezeReason) { InetSocketAddress addr = new InetSocketAddress(host, port); - SubscribesConnectionEntry connectionEntry = addr2Entry.get(addr); + ClientConnectionsEntry connectionEntry = addr2Entry.get(addr); if (connectionEntry == null) { return Collections.emptyList(); } @@ -133,7 +133,7 @@ public class LoadBalancerManagerImpl implements LoadBalancerManager { } public Future getConnection(InetSocketAddress addr) { - SubscribesConnectionEntry entry = addr2Entry.get(addr); + ClientConnectionsEntry entry = addr2Entry.get(addr); if (entry != null) { return entries.get(entry); } @@ -146,23 +146,23 @@ public class LoadBalancerManagerImpl implements LoadBalancerManager { } public void returnSubscribeConnection(RedisPubSubConnection connection) { - SubscribesConnectionEntry entry = addr2Entry.get(connection.getRedisClient().getAddr()); + ClientConnectionsEntry entry = addr2Entry.get(connection.getRedisClient().getAddr()); pubSubEntries.returnConnection(entry, connection); } public void returnConnection(RedisConnection connection) { - SubscribesConnectionEntry entry = addr2Entry.get(connection.getRedisClient().getAddr()); + ClientConnectionsEntry entry = addr2Entry.get(connection.getRedisClient().getAddr()); entries.returnConnection(entry, connection); } public void shutdown() { - for (SubscribesConnectionEntry entry : addr2Entry.values()) { + for (ClientConnectionsEntry entry : addr2Entry.values()) { entry.getClient().shutdown(); } } public void shutdownAsync() { - for (SubscribesConnectionEntry entry : addr2Entry.values()) { + for (ClientConnectionsEntry entry : addr2Entry.values()) { connectionManager.shutdownAsync(entry.getClient()); } } diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 85790007d..4ca1f01c7 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -39,7 +39,7 @@ import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.pubsub.PubSubType; import org.redisson.cluster.ClusterSlotRange; -import org.redisson.connection.SubscribesConnectionEntry.FreezeReason; +import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.misc.InfinitySemaphoreLatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/src/main/java/org/redisson/connection/MasterSlaveEntry.java index afabccecd..1325bdc08 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -26,8 +26,8 @@ import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisPubSubConnection; import org.redisson.cluster.ClusterSlotRange; -import org.redisson.connection.SubscribesConnectionEntry.FreezeReason; -import org.redisson.connection.SubscribesConnectionEntry.NodeType; +import org.redisson.connection.ClientConnectionsEntry.FreezeReason; +import org.redisson.connection.ClientConnectionsEntry.NodeType; import org.redisson.misc.MasterConnectionPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +45,7 @@ public class MasterSlaveEntry { final Logger log = LoggerFactory.getLogger(getClass()); LoadBalancerManager slaveBalancer; - SubscribesConnectionEntry masterEntry; + ClientConnectionsEntry masterEntry; final ConnectionListener connectListener; @@ -80,7 +80,7 @@ public class MasterSlaveEntry { public void setupMasterEntry(String host, int port) { RedisClient client = connectionManager.createClient(host, port); - masterEntry = new SubscribesConnectionEntry(client, config.getMasterConnectionPoolSize(), 0, connectListener, NodeType.MASTER); + masterEntry = new ClientConnectionsEntry(client, config.getMasterConnectionPoolSize(), 0, connectListener, NodeType.MASTER); writeConnectionHolder.add(masterEntry); } @@ -101,7 +101,7 @@ public class MasterSlaveEntry { private void addSlave(String host, int port, boolean freezed, NodeType mode) { RedisClient client = connectionManager.createClient(host, port); - SubscribesConnectionEntry entry = new SubscribesConnectionEntry(client, + ClientConnectionsEntry entry = new ClientConnectionsEntry(client, this.config.getSlaveConnectionPoolSize(), this.config.getSlaveSubscriptionConnectionPoolSize(), connectListener, mode); if (freezed) { @@ -133,7 +133,7 @@ public class MasterSlaveEntry { * */ public void changeMaster(String host, int port) { - SubscribesConnectionEntry oldMaster = masterEntry; + ClientConnectionsEntry oldMaster = masterEntry; setupMasterEntry(host, port); writeConnectionHolder.remove(oldMaster); if (slaveBalancer.getAvailableClients() > 1) { diff --git a/src/main/java/org/redisson/connection/RandomLoadBalancer.java b/src/main/java/org/redisson/connection/RandomLoadBalancer.java index edb36a407..b50925fb9 100644 --- a/src/main/java/org/redisson/connection/RandomLoadBalancer.java +++ b/src/main/java/org/redisson/connection/RandomLoadBalancer.java @@ -23,7 +23,7 @@ public class RandomLoadBalancer implements LoadBalancer { private final Random random = new SecureRandom(); - public SubscribesConnectionEntry getEntry(List clientsCopy) { + public ClientConnectionsEntry getEntry(List clientsCopy) { int ind = random.nextInt(clientsCopy.size()); return clientsCopy.get(ind); } diff --git a/src/main/java/org/redisson/connection/RoundRobinLoadBalancer.java b/src/main/java/org/redisson/connection/RoundRobinLoadBalancer.java index 6a6cc6d95..365d1b28f 100644 --- a/src/main/java/org/redisson/connection/RoundRobinLoadBalancer.java +++ b/src/main/java/org/redisson/connection/RoundRobinLoadBalancer.java @@ -23,7 +23,7 @@ public class RoundRobinLoadBalancer implements LoadBalancer { private final AtomicInteger index = new AtomicInteger(-1); @Override - public SubscribesConnectionEntry getEntry(List clientsCopy) { + public ClientConnectionsEntry getEntry(List clientsCopy) { int ind = Math.abs(index.incrementAndGet() % clientsCopy.size()); return clientsCopy.get(ind); } diff --git a/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/src/main/java/org/redisson/connection/SentinelConnectionManager.java index 76aee3b41..203604cf3 100755 --- a/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -33,7 +33,7 @@ import org.redisson.client.RedisPubSubConnection; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.pubsub.PubSubType; -import org.redisson.connection.SubscribesConnectionEntry.FreezeReason; +import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.misc.URIBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/org/redisson/connection/SingleEntry.java b/src/main/java/org/redisson/connection/SingleEntry.java index d82ccb087..c20235dbd 100644 --- a/src/main/java/org/redisson/connection/SingleEntry.java +++ b/src/main/java/org/redisson/connection/SingleEntry.java @@ -23,7 +23,7 @@ import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisPubSubConnection; import org.redisson.cluster.ClusterSlotRange; -import org.redisson.connection.SubscribesConnectionEntry.NodeType; +import org.redisson.connection.ClientConnectionsEntry.NodeType; import org.redisson.misc.ConnectionPool; import org.redisson.misc.PubSubConnectionPoll; @@ -36,7 +36,7 @@ public class SingleEntry extends MasterSlaveEntry { public SingleEntry(Set slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config, ConnectionListener connectListener) { super(slotRanges, connectionManager, config, connectListener); pubSubConnectionHolder = new PubSubConnectionPoll(config, connectionManager, this) { - protected SubscribesConnectionEntry getEntry() { + protected ClientConnectionsEntry getEntry() { return entries.get(0); } }; @@ -45,7 +45,7 @@ public class SingleEntry extends MasterSlaveEntry { @Override public void setupMasterEntry(String host, int port) { RedisClient masterClient = connectionManager.createClient(host, port); - masterEntry = new SubscribesConnectionEntry(masterClient, + masterEntry = new ClientConnectionsEntry(masterClient, config.getMasterConnectionPoolSize(), config.getSlaveSubscriptionConnectionPoolSize(), connectListener, NodeType.MASTER); writeConnectionHolder.add(masterEntry); pubSubConnectionHolder.add(masterEntry); diff --git a/src/main/java/org/redisson/misc/ConnectionPool.java b/src/main/java/org/redisson/misc/ConnectionPool.java index 093933e0b..40cf9a1cd 100644 --- a/src/main/java/org/redisson/misc/ConnectionPool.java +++ b/src/main/java/org/redisson/misc/ConnectionPool.java @@ -28,9 +28,9 @@ import org.redisson.client.RedisConnectionException; import org.redisson.client.protocol.RedisCommands; import org.redisson.connection.ConnectionManager; import org.redisson.connection.MasterSlaveEntry; -import org.redisson.connection.SubscribesConnectionEntry; -import org.redisson.connection.SubscribesConnectionEntry.FreezeReason; -import org.redisson.connection.SubscribesConnectionEntry.NodeType; +import org.redisson.connection.ClientConnectionsEntry; +import org.redisson.connection.ClientConnectionsEntry.FreezeReason; +import org.redisson.connection.ClientConnectionsEntry.NodeType; import io.netty.util.Timeout; import io.netty.util.TimerTask; @@ -40,7 +40,7 @@ import io.netty.util.concurrent.Promise; public class ConnectionPool { - protected final List entries = new CopyOnWriteArrayList(); + protected final List entries = new CopyOnWriteArrayList(); final Deque> promises = new LinkedBlockingDeque>(); @@ -67,7 +67,7 @@ public class ConnectionPool { // }, 1, 1, TimeUnit.SECONDS); } - public void add(final SubscribesConnectionEntry entry) { + public void add(final ClientConnectionsEntry entry) { initConnections(entry, new Runnable() { @Override public void run() { @@ -77,7 +77,7 @@ public class ConnectionPool { }, true); } - private void initConnections(final SubscribesConnectionEntry entry, final Runnable runnable, boolean checkFreezed) { + private void initConnections(final ClientConnectionsEntry entry, final Runnable runnable, boolean checkFreezed) { int minimumIdleSize = getMinimumIdleSize(entry); if (minimumIdleSize == 0) { @@ -110,21 +110,21 @@ public class ConnectionPool { } } - protected int getMinimumIdleSize(SubscribesConnectionEntry entry) { + protected int getMinimumIdleSize(ClientConnectionsEntry entry) { return config.getSlaveConnectionMinimumIdleSize(); } - public void remove(SubscribesConnectionEntry entry) { + public void remove(ClientConnectionsEntry entry) { entries.remove(entry); } - protected SubscribesConnectionEntry getEntry() { + protected ClientConnectionsEntry getEntry() { return config.getLoadBalancer().getEntry(entries); } public Future get() { for (int j = entries.size() - 1; j >= 0; j--) { - SubscribesConnectionEntry entry = getEntry(); + ClientConnectionsEntry entry = getEntry(); if (!entry.isFreezed() && tryAcquireConnection(entry)) { Promise promise = connectionManager.newPromise(); connect(entry, promise); @@ -137,7 +137,7 @@ public class ConnectionPool { return promise; } - public Future get(SubscribesConnectionEntry entry) { + public Future get(ClientConnectionsEntry entry) { if (((entry.getNodeType() == NodeType.MASTER && entry.getFreezeReason() == FreezeReason.SYSTEM) || !entry.isFreezed()) && tryAcquireConnection(entry)) { Promise promise = connectionManager.newPromise(); @@ -150,19 +150,19 @@ public class ConnectionPool { return connectionManager.newFailedFuture(exception); } - protected boolean tryAcquireConnection(SubscribesConnectionEntry entry) { + protected boolean tryAcquireConnection(ClientConnectionsEntry entry) { return entry.getFailedAttempts() < config.getSlaveFailedAttempts() && entry.tryAcquireConnection(); } - protected T poll(SubscribesConnectionEntry entry) { + protected T poll(ClientConnectionsEntry entry) { return (T) entry.pollConnection(); } - protected Future connect(SubscribesConnectionEntry entry) { + protected Future connect(ClientConnectionsEntry entry) { return (Future) entry.connect(config); } - private void connect(final SubscribesConnectionEntry entry, final Promise promise) { + private void connect(final ClientConnectionsEntry entry, final Promise promise) { T conn = poll(entry); if (conn != null) { if (!conn.isActive()) { @@ -196,7 +196,7 @@ public class ConnectionPool { }); } - private void promiseSuccessful(final SubscribesConnectionEntry entry, final Promise promise, T conn) { + private void promiseSuccessful(final ClientConnectionsEntry entry, final Promise promise, T conn) { entry.resetFailedAttempts(); if (!promise.trySuccess(conn)) { releaseConnection(entry, conn); @@ -204,7 +204,7 @@ public class ConnectionPool { } } - private void promiseFailure(SubscribesConnectionEntry entry, Promise promise, Throwable cause) { + private void promiseFailure(ClientConnectionsEntry entry, Promise promise, Throwable cause) { if (entry.incFailedAttempts() == config.getSlaveFailedAttempts()) { if (entry.getNodeType() == NodeType.SLAVE) { connectionManager.slaveDown(masterSlaveEntry, entry.getClient().getAddr().getHostName(), @@ -219,14 +219,14 @@ public class ConnectionPool { promise.tryFailure(cause); } - private void freezeMaster(SubscribesConnectionEntry entry) { + private void freezeMaster(ClientConnectionsEntry entry) { if (entry.freezeMaster(FreezeReason.RECONNECT)) { scheduleCheck(entry); } } - private void promiseFailure(SubscribesConnectionEntry entry, Promise promise, T conn) { + private void promiseFailure(ClientConnectionsEntry entry, Promise promise, T conn) { int attempts = entry.incFailedAttempts(); if (attempts == config.getSlaveFailedAttempts()) { if (entry.getNodeType() == NodeType.SLAVE) { @@ -247,7 +247,7 @@ public class ConnectionPool { promise.tryFailure(cause); } - private void scheduleCheck(final SubscribesConnectionEntry entry) { + private void scheduleCheck(final ClientConnectionsEntry entry) { connectionManager.newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { @@ -321,7 +321,7 @@ public class ConnectionPool { }, config.getSlaveReconnectionTimeout(), TimeUnit.MILLISECONDS); } - public void returnConnection(SubscribesConnectionEntry entry, T connection) { + public void returnConnection(ClientConnectionsEntry entry, T connection) { if (entry.isFreezed()) { connection.closeAsync(); } else { @@ -333,13 +333,13 @@ public class ConnectionPool { releaseConnection(entry); } - protected void releaseConnection(SubscribesConnectionEntry entry) { + protected void releaseConnection(ClientConnectionsEntry entry) { entry.releaseConnection(); handleQueue(entry, true); } - private void handleQueue(SubscribesConnectionEntry entry, boolean checkFreezed) { + private void handleQueue(ClientConnectionsEntry entry, boolean checkFreezed) { while (true) { if (checkFreezed && entry.isFreezed()) { return; @@ -361,7 +361,7 @@ public class ConnectionPool { } } - protected void releaseConnection(SubscribesConnectionEntry entry, T conn) { + protected void releaseConnection(ClientConnectionsEntry entry, T conn) { entry.releaseConnection(conn); } diff --git a/src/main/java/org/redisson/misc/MasterConnectionPool.java b/src/main/java/org/redisson/misc/MasterConnectionPool.java index 5d2b680b9..020f330e3 100644 --- a/src/main/java/org/redisson/misc/MasterConnectionPool.java +++ b/src/main/java/org/redisson/misc/MasterConnectionPool.java @@ -19,7 +19,7 @@ import org.redisson.MasterSlaveServersConfig; import org.redisson.client.RedisConnection; import org.redisson.connection.ConnectionManager; import org.redisson.connection.MasterSlaveEntry; -import org.redisson.connection.SubscribesConnectionEntry; +import org.redisson.connection.ClientConnectionsEntry; public class MasterConnectionPool extends ConnectionPool { @@ -29,12 +29,12 @@ public class MasterConnectionPool extends ConnectionPool { } @Override - protected SubscribesConnectionEntry getEntry() { + protected ClientConnectionsEntry getEntry() { return entries.get(0); } @Override - protected int getMinimumIdleSize(SubscribesConnectionEntry entry) { + protected int getMinimumIdleSize(ClientConnectionsEntry entry) { return config.getMasterConnectionMinimumIdleSize(); } diff --git a/src/main/java/org/redisson/misc/PubSubConnectionPoll.java b/src/main/java/org/redisson/misc/PubSubConnectionPoll.java index 804b4daa3..7228d4daa 100644 --- a/src/main/java/org/redisson/misc/PubSubConnectionPoll.java +++ b/src/main/java/org/redisson/misc/PubSubConnectionPoll.java @@ -19,7 +19,7 @@ import org.redisson.MasterSlaveServersConfig; import org.redisson.client.RedisPubSubConnection; import org.redisson.connection.ConnectionManager; import org.redisson.connection.MasterSlaveEntry; -import org.redisson.connection.SubscribesConnectionEntry; +import org.redisson.connection.ClientConnectionsEntry; import io.netty.util.concurrent.Future; @@ -30,32 +30,32 @@ public class PubSubConnectionPoll extends ConnectionPool } @Override - protected RedisPubSubConnection poll(SubscribesConnectionEntry entry) { + protected RedisPubSubConnection poll(ClientConnectionsEntry entry) { return entry.pollSubscribeConnection(); } @Override - protected int getMinimumIdleSize(SubscribesConnectionEntry entry) { + protected int getMinimumIdleSize(ClientConnectionsEntry entry) { return config.getSlaveSubscriptionConnectionMinimumIdleSize(); } @Override - protected Future connect(SubscribesConnectionEntry entry) { + protected Future connect(ClientConnectionsEntry entry) { return entry.connectPubSub(config); } @Override - protected boolean tryAcquireConnection(SubscribesConnectionEntry entry) { + protected boolean tryAcquireConnection(ClientConnectionsEntry entry) { return entry.tryAcquireSubscribeConnection(); } @Override - protected void releaseConnection(SubscribesConnectionEntry entry) { + protected void releaseConnection(ClientConnectionsEntry entry) { entry.releaseSubscribeConnection(); } @Override - protected void releaseConnection(SubscribesConnectionEntry entry, RedisPubSubConnection conn) { + protected void releaseConnection(ClientConnectionsEntry entry, RedisPubSubConnection conn) { entry.releaseSubscribeConnection(conn); }