Ping fails if requirePass is set. #304

pull/303/head
Nikita 9 years ago
parent ce060ef651
commit 3d507c4fec

@ -33,6 +33,7 @@ import org.redisson.core.NodesGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
public class RedisNodes<N extends Node> implements NodesGroup<N> {
@ -58,34 +59,43 @@ public class RedisNodes<N extends Node> implements NodesGroup<N> {
@Override
public void operationComplete(Future<RedisConnection> future) throws Exception {
if (future.isSuccess()) {
RedisConnection c = future.getNow();
Future<String> r = future.getNow().async(RedisCommands.PING);
result.put(c, r);
final RedisConnection c = future.getNow();
Promise<RedisConnection> connectionFuture = connectionManager.newPromise();
connectionManager.getConnectListener().onConnect(connectionFuture, c, null, connectionManager.getConfig());
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> future) throws Exception {
Future<String> r = c.async(RedisCommands.PING);
result.put(c, r);
latch.countDown();
}
});
} else {
latch.countDown();
}
latch.countDown();
}
});
}
long time = System.currentTimeMillis();
try {
latch.await(connectionManager.getConfig().getPingTimeout(), TimeUnit.MILLISECONDS);
latch.await(connectionManager.getConfig().getConnectTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (System.currentTimeMillis() - time >= connectionManager.getConfig().getPingTimeout()) {
if (System.currentTimeMillis() - time >= connectionManager.getConfig().getConnectTimeout()) {
for (Entry<RedisConnection, Future<String>> entry : result.entrySet()) {
entry.getKey().closeAsync();
}
return false;
}
time = System.currentTimeMillis();
boolean res = true;
for (Entry<RedisConnection, Future<String>> entry : result.entrySet()) {
Future<String> f = entry.getValue();
long timeout = Math.max(connectionManager.getConfig().getPingTimeout() - (System.currentTimeMillis() - time), 0);
f.awaitUninterruptibly(timeout, TimeUnit.MILLISECONDS);
f.awaitUninterruptibly(connectionManager.getConfig().getPingTimeout(), TimeUnit.MILLISECONDS);
if (!"PONG".equals(f.getNow())) {
res = false;
}

@ -16,6 +16,7 @@
package org.redisson.cluster;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.DefaultConnectionListener;
@ -31,8 +32,8 @@ public class ClusterConnectionListener extends DefaultConnectionListener {
}
@Override
public void onConnect(MasterSlaveServersConfig config, NodeType serverMode, FutureConnectionListener connectionListener) throws RedisException {
super.onConnect(config, serverMode, connectionListener);
public void doConnect(MasterSlaveServersConfig config, NodeType serverMode, FutureConnectionListener<? extends RedisConnection> connectionListener) throws RedisException {
super.doConnect(config, serverMode, connectionListener);
if (serverMode == NodeType.SLAVE && readFromSlaves) {
connectionListener.addCommand(RedisCommands.READONLY);
}

@ -152,10 +152,7 @@ public class ClientConnectionsEntry {
RedisConnection conn = future.getNow();
log.debug("new connection created: {}", conn);
FutureConnectionListener<RedisConnection> listener = new FutureConnectionListener<RedisConnection>(connectionFuture, conn);
connectionListener.onConnect(config, nodeType, listener);
listener.executeCommands();
connectionListener.onConnect(connectionFuture, conn, nodeType, config);
addReconnectListener(config, conn);
}
@ -167,9 +164,7 @@ public class ClientConnectionsEntry {
conn.setReconnectListener(new ReconnectListener() {
@Override
public void onReconnect(RedisConnection conn, Promise<RedisConnection> connectionFuture) {
FutureConnectionListener<RedisConnection> listener = new FutureConnectionListener<RedisConnection>(connectionFuture, conn);
connectionListener.onConnect(config, nodeType, listener);
listener.executeCommands();
connectionListener.onConnect(connectionFuture, conn, nodeType, config);
}
});
}
@ -187,10 +182,7 @@ public class ClientConnectionsEntry {
RedisPubSubConnection conn = future.getNow();
log.debug("new pubsub connection created: {}", conn);
FutureConnectionListener<RedisPubSubConnection> listener = new FutureConnectionListener<RedisPubSubConnection>(connectionFuture, conn);
connectionListener.onConnect(config, nodeType, listener);
listener.executeCommands();
connectionListener.onConnect(connectionFuture, conn, nodeType, config);
addReconnectListener(config, conn);
allSubscribeConnections.add(conn);

@ -16,11 +16,13 @@
package org.redisson.connection;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisException;
import org.redisson.client.RedisConnection;
import org.redisson.connection.ClientConnectionsEntry.NodeType;
import io.netty.util.concurrent.Promise;
public interface ConnectionListener {
void onConnect(MasterSlaveServersConfig config, NodeType serverMode, FutureConnectionListener connectionListener) throws RedisException;
<T extends RedisConnection> void onConnect(Promise<T> connectionFuture, T conn, NodeType nodeType, MasterSlaveServersConfig config);
}

@ -45,6 +45,8 @@ import io.netty.util.concurrent.Promise;
*/
public interface ConnectionManager {
ConnectionListener getConnectListener();
IdleConnectionWatcher getConnectionWatcher();
<R> Future<R> newFailedFuture(Throwable cause);

@ -16,14 +16,22 @@
package org.redisson.connection;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ClientConnectionsEntry.NodeType;
import io.netty.util.concurrent.Promise;
public class DefaultConnectionListener implements ConnectionListener {
@Override
public void onConnect(MasterSlaveServersConfig config, NodeType serverMode, FutureConnectionListener connectionListener)
public <T extends RedisConnection> void onConnect(Promise<T> connectionFuture, T conn, NodeType nodeType, MasterSlaveServersConfig config) {
FutureConnectionListener<T> listener = new FutureConnectionListener<T>(connectionFuture, conn);
doConnect(config, nodeType, listener);
listener.executeCommands();
}
protected void doConnect(MasterSlaveServersConfig config, NodeType serverMode, FutureConnectionListener<? extends RedisConnection> connectionListener)
throws RedisException {
if (config.getPassword() != null) {
connectionListener.addCommand(RedisCommands.AUTH, config.getPassword());

@ -169,6 +169,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
initEntry(config);
}
public ConnectionListener getConnectListener() {
return connectListener;
}
protected void initEntry(MasterSlaveServersConfig config) {
HashSet<ClusterSlotRange> slots = new HashSet<ClusterSlotRange>();
slots.add(singleSlotRange);
@ -196,12 +200,12 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override
public RedisClient createClient(String host, int port) {
RedisClient client = createClient(host, port, config.getConnectTimeout());
clients.add(new RedisClientEntry(client));
clients.add(new RedisClientEntry(client, this));
return client;
}
public void shutdownAsync(RedisClient client) {
clients.remove(new RedisClientEntry(client));
clients.remove(new RedisClientEntry(client, this));
client.shutdownAsync();
}

@ -18,18 +18,23 @@ package org.redisson.connection;
import java.net.InetSocketAddress;
import java.util.Map;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.core.ClusterNode;
import io.netty.util.concurrent.Promise;
public class RedisClientEntry implements ClusterNode {
private final RedisClient client;
private final ConnectionManager manager;
public RedisClientEntry(RedisClient client) {
public RedisClientEntry(RedisClient client, ConnectionManager manager) {
super();
this.client = client;
this.manager = manager;
}
public RedisClient getClient() {
@ -41,9 +46,17 @@ public class RedisClientEntry implements ClusterNode {
return client.getAddr();
}
private RedisConnection connect() {
RedisConnection c = client.connect();
Promise<RedisConnection> future = manager.newPromise();
manager.getConnectListener().onConnect(future, c, null, manager.getConfig());
future.syncUninterruptibly();
return future.getNow();
}
@Override
public boolean ping() {
RedisConnection c = client.connect();
RedisConnection c = connect();
try {
return "PONG".equals(c.sync(RedisCommands.PING));
} catch (Exception e) {
@ -80,7 +93,7 @@ public class RedisClientEntry implements ClusterNode {
@Override
public Map<String, String> info() {
RedisConnection c = client.connect();
RedisConnection c = connect();
try {
return c.sync(RedisCommands.CLUSTER_INFO);
} catch (Exception e) {

@ -34,6 +34,8 @@ public interface NodesGroup<N extends Node> {
/**
* Ping all Redis nodes
*
* @return <code>true</code> if all nodes have replied "PONG", <code>false</code> in other case.
*/
boolean pingAll();

Loading…
Cancel
Save