Merge pull request #520 from fransiskusx/improve_auth_reconnect

improve auth reconnect
pull/528/head
Nikita Koksharov 9 years ago
commit 6a263fc41f

@ -19,6 +19,7 @@ import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.ReconnectListener; import org.redisson.client.ReconnectListener;
import org.redisson.client.RedisClient; import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
@ -186,6 +187,10 @@ public class ClientConnectionsEntry {
}); });
} }
public MasterSlaveServersConfig getConfig() {
return connectionManager.getConfig();
}
public Future<RedisPubSubConnection> connectPubSub() { public Future<RedisPubSubConnection> connectPubSub() {
final Promise<RedisPubSubConnection> connectionFuture = ImmediateEventExecutor.INSTANCE.newPromise(); final Promise<RedisPubSubConnection> connectionFuture = ImmediateEventExecutor.INSTANCE.newPromise();
Future<RedisPubSubConnection> future = client.connectPubSubAsync(); Future<RedisPubSubConnection> future = client.connectPubSubAsync();

@ -326,13 +326,12 @@ abstract class ConnectionPool<T extends RedisConnection> {
return; return;
} }
Future<String> f = c.asyncWithTimeout(null, RedisCommands.PING); final FutureListener<String> pingListener = new FutureListener<String>() {
f.addListener(new FutureListener<String>() {
@Override @Override
public void operationComplete(Future<String> future) throws Exception { public void operationComplete(Future<String> future) throws Exception {
try { try {
if (entry.getFreezeReason() != FreezeReason.RECONNECT if (entry.getFreezeReason() != FreezeReason.RECONNECT
|| !entry.isFreezed()) { || !entry.isFreezed()) {
return; return;
} }
@ -342,7 +341,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
promise.addListener(new FutureListener<Void>() { promise.addListener(new FutureListener<Void>() {
@Override @Override
public void operationComplete(Future<Void> future) public void operationComplete(Future<Void> future)
throws Exception { throws Exception {
if (entry.getNodeType() == NodeType.SLAVE) { if (entry.getNodeType() == NodeType.SLAVE) {
masterSlaveEntry.slaveUp(entry.getClient().getAddr().getHostName(), entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT); masterSlaveEntry.slaveUp(entry.getClient().getAddr().getHostName(), entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT);
log.info("slave {} successfully reconnected", entry.getClient().getAddr()); log.info("slave {} successfully reconnected", entry.getClient().getAddr());
@ -365,13 +364,32 @@ abstract class ConnectionPool<T extends RedisConnection> {
c.closeAsync(); c.closeAsync();
} }
} }
}); };
if (entry.getConfig().getPassword() != null) {
Future<Void> temp = c.asyncWithTimeout(null, RedisCommands.AUTH, config.getPassword());
FutureListener<Void> listener = new FutureListener<Void> () {
@Override public void operationComplete (Future < Void > future)throws Exception {
ping(c, pingListener);
}
};
temp.addListener(listener);
} else {
ping(c, pingListener);
}
} }
}); });
} }
}, config.getReconnectionTimeout(), TimeUnit.MILLISECONDS); }, config.getReconnectionTimeout(), TimeUnit.MILLISECONDS);
} }
private void ping(RedisConnection c, final FutureListener<String> pingListener) {
Future<String> f = c.asyncWithTimeout(null, RedisCommands.PING);
f.addListener(pingListener);
}
public void returnConnection(ClientConnectionsEntry entry, T connection) { public void returnConnection(ClientConnectionsEntry entry, T connection) {
if (entry.isFreezed()) { if (entry.isFreezed()) {
connection.closeAsync(); connection.closeAsync();

Loading…
Cancel
Save