diff --git a/src/main/java/org/redisson/RedisNodes.java b/src/main/java/org/redisson/RedisNodes.java index 2db86a4b7..64d879072 100644 --- a/src/main/java/org/redisson/RedisNodes.java +++ b/src/main/java/org/redisson/RedisNodes.java @@ -33,6 +33,7 @@ import org.redisson.core.NodesGroup; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.Promise; public class RedisNodes implements NodesGroup { @@ -58,34 +59,43 @@ public class RedisNodes implements NodesGroup { @Override public void operationComplete(Future future) throws Exception { if (future.isSuccess()) { - RedisConnection c = future.getNow(); - Future r = future.getNow().async(RedisCommands.PING); - result.put(c, r); + final RedisConnection c = future.getNow(); + Promise connectionFuture = connectionManager.newPromise(); + connectionManager.getConnectListener().onConnect(connectionFuture, c, null, connectionManager.getConfig()); + connectionFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + Future r = c.async(RedisCommands.PING); + result.put(c, r); + latch.countDown(); + } + }); + } else { + latch.countDown(); } - latch.countDown(); } }); } long time = System.currentTimeMillis(); try { - latch.await(connectionManager.getConfig().getPingTimeout(), TimeUnit.MILLISECONDS); + latch.await(connectionManager.getConfig().getConnectTimeout(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - if (System.currentTimeMillis() - time >= connectionManager.getConfig().getPingTimeout()) { + if (System.currentTimeMillis() - time >= connectionManager.getConfig().getConnectTimeout()) { for (Entry> entry : result.entrySet()) { entry.getKey().closeAsync(); } return false; } + time = System.currentTimeMillis(); boolean res = true; for (Entry> entry : result.entrySet()) { Future f = entry.getValue(); - long timeout = Math.max(connectionManager.getConfig().getPingTimeout() - (System.currentTimeMillis() - time), 0); - f.awaitUninterruptibly(timeout, TimeUnit.MILLISECONDS); + f.awaitUninterruptibly(connectionManager.getConfig().getPingTimeout(), TimeUnit.MILLISECONDS); if (!"PONG".equals(f.getNow())) { res = false; } diff --git a/src/main/java/org/redisson/cluster/ClusterConnectionListener.java b/src/main/java/org/redisson/cluster/ClusterConnectionListener.java index e37f41374..f914cdc3c 100644 --- a/src/main/java/org/redisson/cluster/ClusterConnectionListener.java +++ b/src/main/java/org/redisson/cluster/ClusterConnectionListener.java @@ -16,6 +16,7 @@ package org.redisson.cluster; import org.redisson.MasterSlaveServersConfig; +import org.redisson.client.RedisConnection; import org.redisson.client.RedisException; import org.redisson.client.protocol.RedisCommands; import org.redisson.connection.DefaultConnectionListener; @@ -31,8 +32,8 @@ public class ClusterConnectionListener extends DefaultConnectionListener { } @Override - public void onConnect(MasterSlaveServersConfig config, NodeType serverMode, FutureConnectionListener connectionListener) throws RedisException { - super.onConnect(config, serverMode, connectionListener); + public void doConnect(MasterSlaveServersConfig config, NodeType serverMode, FutureConnectionListener connectionListener) throws RedisException { + super.doConnect(config, serverMode, connectionListener); if (serverMode == NodeType.SLAVE && readFromSlaves) { connectionListener.addCommand(RedisCommands.READONLY); } diff --git a/src/main/java/org/redisson/connection/ClientConnectionsEntry.java b/src/main/java/org/redisson/connection/ClientConnectionsEntry.java index a8b5f2a47..146611049 100644 --- a/src/main/java/org/redisson/connection/ClientConnectionsEntry.java +++ b/src/main/java/org/redisson/connection/ClientConnectionsEntry.java @@ -152,10 +152,7 @@ public class ClientConnectionsEntry { RedisConnection conn = future.getNow(); log.debug("new connection created: {}", conn); - FutureConnectionListener listener = new FutureConnectionListener(connectionFuture, conn); - connectionListener.onConnect(config, nodeType, listener); - listener.executeCommands(); - + connectionListener.onConnect(connectionFuture, conn, nodeType, config); addReconnectListener(config, conn); } @@ -167,9 +164,7 @@ public class ClientConnectionsEntry { conn.setReconnectListener(new ReconnectListener() { @Override public void onReconnect(RedisConnection conn, Promise connectionFuture) { - FutureConnectionListener listener = new FutureConnectionListener(connectionFuture, conn); - connectionListener.onConnect(config, nodeType, listener); - listener.executeCommands(); + connectionListener.onConnect(connectionFuture, conn, nodeType, config); } }); } @@ -187,10 +182,7 @@ public class ClientConnectionsEntry { RedisPubSubConnection conn = future.getNow(); log.debug("new pubsub connection created: {}", conn); - FutureConnectionListener listener = new FutureConnectionListener(connectionFuture, conn); - connectionListener.onConnect(config, nodeType, listener); - listener.executeCommands(); - + connectionListener.onConnect(connectionFuture, conn, nodeType, config); addReconnectListener(config, conn); allSubscribeConnections.add(conn); diff --git a/src/main/java/org/redisson/connection/ConnectionListener.java b/src/main/java/org/redisson/connection/ConnectionListener.java index addcdada2..69ba06a7e 100644 --- a/src/main/java/org/redisson/connection/ConnectionListener.java +++ b/src/main/java/org/redisson/connection/ConnectionListener.java @@ -16,11 +16,13 @@ package org.redisson.connection; import org.redisson.MasterSlaveServersConfig; -import org.redisson.client.RedisException; +import org.redisson.client.RedisConnection; import org.redisson.connection.ClientConnectionsEntry.NodeType; +import io.netty.util.concurrent.Promise; + public interface ConnectionListener { - void onConnect(MasterSlaveServersConfig config, NodeType serverMode, FutureConnectionListener connectionListener) throws RedisException; + void onConnect(Promise connectionFuture, T conn, NodeType nodeType, MasterSlaveServersConfig config); } diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index f5e735515..4cf620aed 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -45,6 +45,8 @@ import io.netty.util.concurrent.Promise; */ public interface ConnectionManager { + ConnectionListener getConnectListener(); + IdleConnectionWatcher getConnectionWatcher(); Future newFailedFuture(Throwable cause); diff --git a/src/main/java/org/redisson/connection/DefaultConnectionListener.java b/src/main/java/org/redisson/connection/DefaultConnectionListener.java index 2992a8b8f..c3436cf81 100644 --- a/src/main/java/org/redisson/connection/DefaultConnectionListener.java +++ b/src/main/java/org/redisson/connection/DefaultConnectionListener.java @@ -16,14 +16,22 @@ package org.redisson.connection; import org.redisson.MasterSlaveServersConfig; +import org.redisson.client.RedisConnection; import org.redisson.client.RedisException; import org.redisson.client.protocol.RedisCommands; import org.redisson.connection.ClientConnectionsEntry.NodeType; +import io.netty.util.concurrent.Promise; + public class DefaultConnectionListener implements ConnectionListener { - @Override - public void onConnect(MasterSlaveServersConfig config, NodeType serverMode, FutureConnectionListener connectionListener) + public void onConnect(Promise connectionFuture, T conn, NodeType nodeType, MasterSlaveServersConfig config) { + FutureConnectionListener listener = new FutureConnectionListener(connectionFuture, conn); + doConnect(config, nodeType, listener); + listener.executeCommands(); + } + + protected void doConnect(MasterSlaveServersConfig config, NodeType serverMode, FutureConnectionListener connectionListener) throws RedisException { if (config.getPassword() != null) { connectionListener.addCommand(RedisCommands.AUTH, config.getPassword()); diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 4cd513cc9..f9774f90e 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -169,6 +169,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager { initEntry(config); } + public ConnectionListener getConnectListener() { + return connectListener; + } + protected void initEntry(MasterSlaveServersConfig config) { HashSet slots = new HashSet(); slots.add(singleSlotRange); @@ -196,12 +200,12 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public RedisClient createClient(String host, int port) { RedisClient client = createClient(host, port, config.getConnectTimeout()); - clients.add(new RedisClientEntry(client)); + clients.add(new RedisClientEntry(client, this)); return client; } public void shutdownAsync(RedisClient client) { - clients.remove(new RedisClientEntry(client)); + clients.remove(new RedisClientEntry(client, this)); client.shutdownAsync(); } diff --git a/src/main/java/org/redisson/connection/RedisClientEntry.java b/src/main/java/org/redisson/connection/RedisClientEntry.java index 316cdcdb6..c1c102d07 100644 --- a/src/main/java/org/redisson/connection/RedisClientEntry.java +++ b/src/main/java/org/redisson/connection/RedisClientEntry.java @@ -18,18 +18,23 @@ package org.redisson.connection; import java.net.InetSocketAddress; import java.util.Map; +import org.redisson.MasterSlaveServersConfig; import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.protocol.RedisCommands; import org.redisson.core.ClusterNode; +import io.netty.util.concurrent.Promise; + public class RedisClientEntry implements ClusterNode { private final RedisClient client; + private final ConnectionManager manager; - public RedisClientEntry(RedisClient client) { + public RedisClientEntry(RedisClient client, ConnectionManager manager) { super(); this.client = client; + this.manager = manager; } public RedisClient getClient() { @@ -41,9 +46,17 @@ public class RedisClientEntry implements ClusterNode { return client.getAddr(); } + private RedisConnection connect() { + RedisConnection c = client.connect(); + Promise future = manager.newPromise(); + manager.getConnectListener().onConnect(future, c, null, manager.getConfig()); + future.syncUninterruptibly(); + return future.getNow(); + } + @Override public boolean ping() { - RedisConnection c = client.connect(); + RedisConnection c = connect(); try { return "PONG".equals(c.sync(RedisCommands.PING)); } catch (Exception e) { @@ -80,7 +93,7 @@ public class RedisClientEntry implements ClusterNode { @Override public Map info() { - RedisConnection c = client.connect(); + RedisConnection c = connect(); try { return c.sync(RedisCommands.CLUSTER_INFO); } catch (Exception e) { diff --git a/src/main/java/org/redisson/core/NodesGroup.java b/src/main/java/org/redisson/core/NodesGroup.java index 80a125d61..00ebea4cd 100644 --- a/src/main/java/org/redisson/core/NodesGroup.java +++ b/src/main/java/org/redisson/core/NodesGroup.java @@ -34,6 +34,8 @@ public interface NodesGroup { /** * Ping all Redis nodes + * + * @return true if all nodes have replied "PONG", false in other case. */ boolean pingAll();