Connection timeout handling reimplemented. #262

pull/297/head
Nikita 9 years ago
parent b030d88101
commit 3ea1931271

@ -26,7 +26,7 @@ class BaseConfig<T extends BaseConfig<T>> {
private int pingTimeout = 1000;
/**
* Connect timeout used for any Redis server connection.
* This timeout used during connection establishment to any Redis server.
* Value in milliseconds.
*
*/
@ -37,9 +37,9 @@ class BaseConfig<T extends BaseConfig<T>> {
* Then amount is reached exception will be thrown in case of <b>sync</b> operation usage
* or <code>Future</code> callback fails in case of <b>async</b> operation.
*/
private int timeout = 60000;
private int timeout = 1000;
private int retryAttempts = 20;
private int retryAttempts = 3;
private int retryInterval = 1000;
@ -128,7 +128,7 @@ class BaseConfig<T extends BaseConfig<T>> {
}
/**
* Time pause before next reconnection attempt.
* Time pause before next command attempt.
*
* Used then connection with redis server is down.
*
@ -216,7 +216,7 @@ class BaseConfig<T extends BaseConfig<T>> {
}
/**
* Connect timeout used for any Redis server connection.
* This timeout used during connection establishment to any Redis server.
*
* @param connectTimeout - timeout in milliseconds
* @return

@ -246,7 +246,9 @@ public class CommandBatchExecutorService extends CommandExecutorService {
};
ex.set(new RedisTimeoutException("Batch command execution timeout"));
final Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);
final AtomicReference<Timeout> timeoutRef = new AtomicReference<Timeout>();
Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);
timeoutRef.set(timeout);
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
@ -255,10 +257,10 @@ public class CommandBatchExecutorService extends CommandExecutorService {
return;
}
if (!connFuture.isSuccess()) {
timeout.cancel();
ex.set(convertException(connFuture));
connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
if (timeoutRef.get().cancel()) {
connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
}
return;
}
@ -279,17 +281,18 @@ public class CommandBatchExecutorService extends CommandExecutorService {
}
if (!future.isSuccess()) {
timeout.cancel();
ex.set(new WriteRedisConnectionException("Can't write commands batch to channel: " + future.channel(), future.cause()));
connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
if (timeoutRef.get().cancel()) {
connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
}
}
}
});
if (entry.isReadOnlyMode()) {
attemptPromise.addListener(connectionManager.createReleaseReadListener(source, connection, timeout));
attemptPromise.addListener(connectionManager.createReleaseReadListener(source, connection, timeoutRef));
} else {
attemptPromise.addListener(connectionManager.createReleaseWriteListener(source, connection, timeout));
attemptPromise.addListener(connectionManager.createReleaseWriteListener(source, connection, timeoutRef));
}
}
});
@ -297,7 +300,7 @@ public class CommandBatchExecutorService extends CommandExecutorService {
attemptPromise.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
timeout.cancel();
timeoutRef.get().cancel();
if (future.isCancelled() || mainPromise.isCancelled()) {
return;
}

@ -424,7 +424,11 @@ public class CommandExecutorService implements CommandExecutor {
}
final Promise<R> attemptPromise = connectionManager.newPromise();
final AtomicReference<RedisException> ex = new AtomicReference<RedisException>();
final AtomicReference<ChannelFuture> writeFutureRef = new AtomicReference<ChannelFuture>();
final AtomicReference<RedisException> exceptionRef = new AtomicReference<RedisException>();
final AtomicReference<Timeout> timeoutRef = new AtomicReference<Timeout>();
final Future<RedisConnection> connectionFuture;
if (readOnlyMode) {
@ -440,6 +444,17 @@ public class CommandExecutorService implements CommandExecutor {
connectionManager.getShutdownLatch().release();
}
if ((writeFutureRef.get() == null || !writeFutureRef.get().isDone())
&& connectionFuture.isSuccess()) {
Timeout newTimeout = connectionManager.newTimeout(this, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
timeoutRef.set(newTimeout);
return;
}
if (writeFutureRef.get() != null && writeFutureRef.get().isSuccess()) {
return;
}
if (attemptPromise.isDone()) {
return;
}
@ -450,7 +465,7 @@ public class CommandExecutorService implements CommandExecutor {
}
if (attempt == connectionManager.getConfig().getRetryAttempts()) {
attemptPromise.tryFailure(ex.get());
attemptPromise.tryFailure(exceptionRef.get());
return;
}
if (!attemptPromise.cancel(false)) {
@ -462,56 +477,69 @@ public class CommandExecutorService implements CommandExecutor {
}
};
ex.set(new RedisTimeoutException("Command execution timeout for command: " + command + " with params " + Arrays.toString(params) + " attempt " + attempt));
final Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);
exceptionRef.set(new RedisTimeoutException("Command execution timeout for command: " + command + " with params " + Arrays.toString(params)));
Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
timeoutRef.set(timeout);
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
if (attemptPromise.isDone() || connFuture.isCancelled() || mainPromise.isCancelled()) {
if (attemptPromise.isDone() || connFuture.isCancelled() || mainPromise.isCancelled() || timeoutRef.get().isExpired()) {
return;
}
if (!connFuture.isSuccess()) {
timeout.cancel();
ex.set(convertException(connFuture));
connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
exceptionRef.set(convertException(connFuture));
return;
}
RedisConnection connection = connFuture.getNow();
ChannelFuture writeFuture = null;
if (source.getRedirect() == Redirect.ASK) {
List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(2);
Promise<Void> promise = connectionManager.newPromise();
list.add(new CommandData<Void, Void>(promise, codec, RedisCommands.ASKING, new Object[] {}));
list.add(new CommandData<V, R>(attemptPromise, messageDecoder, codec, command, params));
Promise<Void> main = connectionManager.newPromise();
writeFuture = connection.send(new CommandsData(main, list));
ChannelFuture future = connection.send(new CommandsData(main, list));
writeFutureRef.set(future);
} else {
log.debug("getting connection for command {} from slot {} using node {}", command, source, connection.getRedisClient().getAddr());
writeFuture = connection.send(new CommandData<V, R>(attemptPromise, messageDecoder, codec, command, params));
ChannelFuture future = connection.send(new CommandData<V, R>(attemptPromise, messageDecoder, codec, command, params));
writeFutureRef.set(future);
}
writeFuture.addListener(new ChannelFutureListener() {
writeFutureRef.get().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (attemptPromise.isDone() || future.isCancelled() || mainPromise.isCancelled()) {
if (attemptPromise.isDone() || mainPromise.isCancelled()) {
return;
}
if (!future.isSuccess()) {
timeout.cancel();
ex.set(new WriteRedisConnectionException(
exceptionRef.set(new WriteRedisConnectionException(
"Can't write command: " + command + ", params: " + params + " to channel: " + future.channel(), future.cause()));
connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
} else {
timeoutRef.get().cancel();
TimerTask timeoutTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (attemptPromise.isDone()) {
return;
}
attemptPromise.tryFailure(exceptionRef.get());
}
};
Timeout timeout = connectionManager.newTimeout(timeoutTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);
timeoutRef.set(timeout);
}
}
});
if (readOnlyMode) {
attemptPromise.addListener(connectionManager.createReleaseReadListener(source, connection, timeout));
attemptPromise.addListener(connectionManager.createReleaseReadListener(source, connection, timeoutRef));
} else {
attemptPromise.addListener(connectionManager.createReleaseWriteListener(source, connection, timeout));
attemptPromise.addListener(connectionManager.createReleaseWriteListener(source, connection, timeoutRef));
}
}
});
@ -519,28 +547,25 @@ public class CommandExecutorService implements CommandExecutor {
attemptPromise.addListener(new FutureListener<R>() {
@Override
public void operationComplete(Future<R> future) throws Exception {
timeout.cancel();
timeoutRef.get().cancel();
if (future.isCancelled() || mainPromise.isCancelled()) {
return;
}
if (future.cause() instanceof RedisMovedException) {
RedisMovedException ex = (RedisMovedException)future.cause();
connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
async(readOnlyMode, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), messageDecoder, codec, command, params, mainPromise, attempt);
async(readOnlyMode, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), messageDecoder, codec, command, params, mainPromise, 0);
return;
}
if (future.cause() instanceof RedisAskException) {
RedisAskException ex = (RedisAskException)future.cause();
connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
async(readOnlyMode, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), messageDecoder, codec, command, params, mainPromise, attempt);
async(readOnlyMode, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), messageDecoder, codec, command, params, mainPromise, 0);
return;
}
if (future.cause() instanceof RedisLoadingException) {
connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
async(readOnlyMode, source, messageDecoder, codec, command, params, mainPromise, attempt);
async(readOnlyMode, source, messageDecoder, codec, command, params, mainPromise, 0);
return;
}

@ -23,7 +23,6 @@ import java.util.List;
import java.util.Map;
import org.redisson.client.RedisAskException;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
import org.redisson.client.RedisLoadingException;
import org.redisson.client.RedisMovedException;

@ -19,6 +19,7 @@ import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisClient;
@ -72,10 +73,10 @@ public interface ConnectionManager {
Future<RedisConnection> connectionWriteOp(NodeSource source, RedisCommand<?> command);
<T> FutureListener<T> createReleaseReadListener(NodeSource source,
RedisConnection conn, Timeout timeout);
RedisConnection conn, AtomicReference<Timeout> timeout);
<T> FutureListener<T> createReleaseWriteListener(NodeSource source,
RedisConnection conn, Timeout timeout);
RedisConnection conn, AtomicReference<Timeout> timeout);
RedisClient createClient(String host, int port, int timeout);

@ -16,6 +16,7 @@
package org.redisson.connection;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@ -24,6 +25,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.Config;
import org.redisson.MasterSlaveServersConfig;
@ -147,12 +149,17 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected void init(MasterSlaveServersConfig config) {
this.config = config;
int minTimeout = Math.min(config.getRetryInterval(), config.getTimeout());
int[] timeouts = new int[] {config.getRetryInterval(), config.getTimeout(), config.getSlaveReconnectionTimeout()};
Arrays.sort(timeouts);
int minTimeout = timeouts[0];
if (minTimeout % 100 != 0) {
timer = new HashedWheelTimer((minTimeout % 100) / 2, TimeUnit.MILLISECONDS);
minTimeout = (minTimeout % 100) / 2;
} else if (minTimeout == 100) {
minTimeout = 50;
} else {
timer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS);
minTimeout = 100;
}
timer = new HashedWheelTimer(minTimeout, TimeUnit.MILLISECONDS);
initEntry(config);
}
@ -195,7 +202,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override
public <T> FutureListener<T> createReleaseWriteListener(final NodeSource source,
final RedisConnection conn, final Timeout timeout) {
final RedisConnection conn, final AtomicReference<Timeout> timeout) {
return new FutureListener<T>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<T> future) throws Exception {
@ -210,7 +217,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
shutdownLatch.release();
timeout.cancel();
timeout.get().cancel();
releaseWrite(source, conn);
}
};
@ -218,7 +225,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override
public <T> FutureListener<T> createReleaseReadListener(final NodeSource source,
final RedisConnection conn, final Timeout timeout) {
final RedisConnection conn, final AtomicReference<Timeout> timeout) {
return new FutureListener<T>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<T> future) throws Exception {
@ -233,7 +240,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
shutdownLatch.release();
timeout.cancel();
timeout.get().cancel();
releaseRead(source, conn);
}
};

@ -141,9 +141,14 @@ public class MasterSlaveEntry<E extends ConnectionEntry> {
// more than one slave available, so master could be removed from slaves
connectionManager.slaveDown(this, host, port, FreezeReason.SYSTEM);
}
oldMaster.freezeMaster(FreezeReason.MANAGER);
connectionManager.shutdownAsync(oldMaster.getClient());
}
public void freeze() {
masterEntry.freezeMaster(FreezeReason.MANAGER);
}
public void shutdownMasterAsync() {
if (!active.compareAndSet(true, false)) {
return;

@ -154,7 +154,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
onSlaveAdded(addr, msg);
}
if ("+sdown".equals(channel)) {
onSlaveDown(addr, msg);
onNodeDown(addr, msg);
}
if ("-sdown".equals(channel)) {
onSlaveUp(addr, msg);
@ -212,7 +212,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
}
private void onSlaveDown(URI sentinelAddr, String msg) {
private void onNodeDown(URI sentinelAddr, String msg) {
String[] parts = msg.split(" ");
if (parts.length > 3) {
@ -232,7 +232,13 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
log.info("sentinel: {} has down", addr);
}
} else if ("master".equals(parts[0])) {
// skip
String ip = parts[2];
String port = parts[3];
MasterSlaveEntry entry = getEntry(singleSlotRange);
entry.freeze();
String addr = ip + ":" + port;
log.info("master: {} has down", addr);
} else {
log.warn("onSlaveDown. Invalid message: {} from Sentinel {}:{}", msg, sentinelAddr.getHost(), sentinelAddr.getPort());
}

@ -69,6 +69,19 @@ public class SubscribesConnectionEntry extends ConnectionEntry {
connectionsCounter.incrementAndGet();
}
public boolean freezeMaster(FreezeReason reason) {
synchronized (this) {
setFreezed(true);
// only RECONNECT freeze reason could be replaced
if (getFreezeReason() == null
|| getFreezeReason() == FreezeReason.RECONNECT) {
setFreezeReason(reason);
return true;
}
}
return false;
}
public Future<RedisPubSubConnection> connectPubSub(MasterSlaveServersConfig config) {
Future<RedisPubSubConnection> future = super.connectPubSub(config);
future.addListener(new FutureListener<RedisPubSubConnection>() {

@ -18,6 +18,7 @@ package org.redisson.misc;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -59,6 +60,17 @@ public class ConnectionPool<T extends RedisConnection> {
this.loadBalancer = loadBalancer;
this.masterSlaveEntry = masterSlaveEntry;
this.connectionManager = connectionManager;
// Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(new Runnable() {
//
// @Override
// public void run() {
// if (promises.size() > 0) {
// System.out.println("promises " + promises.size());
// }
//
// }
// }, 1, 1, TimeUnit.SECONDS);
}
public void add(final SubscribesConnectionEntry entry) {
@ -215,19 +227,13 @@ public class ConnectionPool<T extends RedisConnection> {
}
}
promises.add(promise);
// promise.tryFailure(cause);
// promises.add(promise);
promise.tryFailure(cause);
}
private void freezeMaster(SubscribesConnectionEntry entry) {
synchronized (entry) {
if (!entry.isFreezed()) {
entry.setFreezed(true);
if (entry.getFreezeReason() == null) {
entry.setFreezeReason(FreezeReason.RECONNECT);
}
scheduleCheck(entry);
}
if (entry.freezeMaster(FreezeReason.RECONNECT)) {
scheduleCheck(entry);
}
}
@ -248,9 +254,9 @@ public class ConnectionPool<T extends RedisConnection> {
releaseConnection(entry);
promises.add(promise);
// RedisConnectionException cause = new RedisConnectionException(conn + " is not active!");
// promise.tryFailure(cause);
// promises.add(promise);
RedisConnectionException cause = new RedisConnectionException(conn + " is not active!");
promise.tryFailure(cause);
}
private void scheduleCheck(final SubscribesConnectionEntry entry) {

Loading…
Cancel
Save