diff --git a/src/main/java/org/redisson/BaseConfig.java b/src/main/java/org/redisson/BaseConfig.java index 30b3e9e89..a6f70f4dd 100644 --- a/src/main/java/org/redisson/BaseConfig.java +++ b/src/main/java/org/redisson/BaseConfig.java @@ -26,7 +26,7 @@ class BaseConfig> { private int pingTimeout = 1000; /** - * Connect timeout used for any Redis server connection. + * This timeout used during connection establishment to any Redis server. * Value in milliseconds. * */ @@ -37,9 +37,9 @@ class BaseConfig> { * Then amount is reached exception will be thrown in case of sync operation usage * or Future callback fails in case of async operation. */ - private int timeout = 60000; + private int timeout = 1000; - private int retryAttempts = 20; + private int retryAttempts = 3; private int retryInterval = 1000; @@ -128,7 +128,7 @@ class BaseConfig> { } /** - * Time pause before next reconnection attempt. + * Time pause before next command attempt. * * Used then connection with redis server is down. * @@ -216,7 +216,7 @@ class BaseConfig> { } /** - * Connect timeout used for any Redis server connection. + * This timeout used during connection establishment to any Redis server. * * @param connectTimeout - timeout in milliseconds * @return diff --git a/src/main/java/org/redisson/CommandBatchExecutorService.java b/src/main/java/org/redisson/CommandBatchExecutorService.java index 19e5a482e..3ac09296e 100644 --- a/src/main/java/org/redisson/CommandBatchExecutorService.java +++ b/src/main/java/org/redisson/CommandBatchExecutorService.java @@ -246,7 +246,9 @@ public class CommandBatchExecutorService extends CommandExecutorService { }; ex.set(new RedisTimeoutException("Batch command execution timeout")); - final Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS); + final AtomicReference timeoutRef = new AtomicReference(); + Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS); + timeoutRef.set(timeout); connectionFuture.addListener(new FutureListener() { @Override @@ -255,10 +257,10 @@ public class CommandBatchExecutorService extends CommandExecutorService { return; } if (!connFuture.isSuccess()) { - timeout.cancel(); - ex.set(convertException(connFuture)); - connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); + if (timeoutRef.get().cancel()) { + connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); + } return; } @@ -279,17 +281,18 @@ public class CommandBatchExecutorService extends CommandExecutorService { } if (!future.isSuccess()) { - timeout.cancel(); ex.set(new WriteRedisConnectionException("Can't write commands batch to channel: " + future.channel(), future.cause())); - connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); + if (timeoutRef.get().cancel()) { + connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); + } } } }); if (entry.isReadOnlyMode()) { - attemptPromise.addListener(connectionManager.createReleaseReadListener(source, connection, timeout)); + attemptPromise.addListener(connectionManager.createReleaseReadListener(source, connection, timeoutRef)); } else { - attemptPromise.addListener(connectionManager.createReleaseWriteListener(source, connection, timeout)); + attemptPromise.addListener(connectionManager.createReleaseWriteListener(source, connection, timeoutRef)); } } }); @@ -297,7 +300,7 @@ public class CommandBatchExecutorService extends CommandExecutorService { attemptPromise.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { - timeout.cancel(); + timeoutRef.get().cancel(); if (future.isCancelled() || mainPromise.isCancelled()) { return; } diff --git a/src/main/java/org/redisson/CommandExecutorService.java b/src/main/java/org/redisson/CommandExecutorService.java index adac5b1bd..89c50ce34 100644 --- a/src/main/java/org/redisson/CommandExecutorService.java +++ b/src/main/java/org/redisson/CommandExecutorService.java @@ -424,7 +424,11 @@ public class CommandExecutorService implements CommandExecutor { } final Promise attemptPromise = connectionManager.newPromise(); - final AtomicReference ex = new AtomicReference(); + + final AtomicReference writeFutureRef = new AtomicReference(); + final AtomicReference exceptionRef = new AtomicReference(); + final AtomicReference timeoutRef = new AtomicReference(); + final Future connectionFuture; if (readOnlyMode) { @@ -440,6 +444,17 @@ public class CommandExecutorService implements CommandExecutor { connectionManager.getShutdownLatch().release(); } + if ((writeFutureRef.get() == null || !writeFutureRef.get().isDone()) + && connectionFuture.isSuccess()) { + Timeout newTimeout = connectionManager.newTimeout(this, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); + timeoutRef.set(newTimeout); + return; + } + + if (writeFutureRef.get() != null && writeFutureRef.get().isSuccess()) { + return; + } + if (attemptPromise.isDone()) { return; } @@ -450,7 +465,7 @@ public class CommandExecutorService implements CommandExecutor { } if (attempt == connectionManager.getConfig().getRetryAttempts()) { - attemptPromise.tryFailure(ex.get()); + attemptPromise.tryFailure(exceptionRef.get()); return; } if (!attemptPromise.cancel(false)) { @@ -462,56 +477,69 @@ public class CommandExecutorService implements CommandExecutor { } }; - ex.set(new RedisTimeoutException("Command execution timeout for command: " + command + " with params " + Arrays.toString(params) + " attempt " + attempt)); - final Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS); + exceptionRef.set(new RedisTimeoutException("Command execution timeout for command: " + command + " with params " + Arrays.toString(params))); + Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); + timeoutRef.set(timeout); connectionFuture.addListener(new FutureListener() { @Override public void operationComplete(Future connFuture) throws Exception { - if (attemptPromise.isDone() || connFuture.isCancelled() || mainPromise.isCancelled()) { + if (attemptPromise.isDone() || connFuture.isCancelled() || mainPromise.isCancelled() || timeoutRef.get().isExpired()) { return; } + if (!connFuture.isSuccess()) { - timeout.cancel(); - ex.set(convertException(connFuture)); - connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); + exceptionRef.set(convertException(connFuture)); return; } RedisConnection connection = connFuture.getNow(); - ChannelFuture writeFuture = null; if (source.getRedirect() == Redirect.ASK) { List> list = new ArrayList>(2); Promise promise = connectionManager.newPromise(); list.add(new CommandData(promise, codec, RedisCommands.ASKING, new Object[] {})); list.add(new CommandData(attemptPromise, messageDecoder, codec, command, params)); Promise main = connectionManager.newPromise(); - writeFuture = connection.send(new CommandsData(main, list)); + ChannelFuture future = connection.send(new CommandsData(main, list)); + writeFutureRef.set(future); } else { log.debug("getting connection for command {} from slot {} using node {}", command, source, connection.getRedisClient().getAddr()); - writeFuture = connection.send(new CommandData(attemptPromise, messageDecoder, codec, command, params)); + ChannelFuture future = connection.send(new CommandData(attemptPromise, messageDecoder, codec, command, params)); + writeFutureRef.set(future); } - writeFuture.addListener(new ChannelFutureListener() { + writeFutureRef.get().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { - if (attemptPromise.isDone() || future.isCancelled() || mainPromise.isCancelled()) { + if (attemptPromise.isDone() || mainPromise.isCancelled()) { return; } if (!future.isSuccess()) { - timeout.cancel(); - ex.set(new WriteRedisConnectionException( + exceptionRef.set(new WriteRedisConnectionException( "Can't write command: " + command + ", params: " + params + " to channel: " + future.channel(), future.cause())); - connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); + } else { + timeoutRef.get().cancel(); + TimerTask timeoutTask = new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + if (attemptPromise.isDone()) { + return; + } + + attemptPromise.tryFailure(exceptionRef.get()); + } + }; + Timeout timeout = connectionManager.newTimeout(timeoutTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS); + timeoutRef.set(timeout); } } }); if (readOnlyMode) { - attemptPromise.addListener(connectionManager.createReleaseReadListener(source, connection, timeout)); + attemptPromise.addListener(connectionManager.createReleaseReadListener(source, connection, timeoutRef)); } else { - attemptPromise.addListener(connectionManager.createReleaseWriteListener(source, connection, timeout)); + attemptPromise.addListener(connectionManager.createReleaseWriteListener(source, connection, timeoutRef)); } } }); @@ -519,28 +547,25 @@ public class CommandExecutorService implements CommandExecutor { attemptPromise.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { - timeout.cancel(); + timeoutRef.get().cancel(); if (future.isCancelled() || mainPromise.isCancelled()) { return; } if (future.cause() instanceof RedisMovedException) { RedisMovedException ex = (RedisMovedException)future.cause(); - connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); - async(readOnlyMode, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), messageDecoder, codec, command, params, mainPromise, attempt); + async(readOnlyMode, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), messageDecoder, codec, command, params, mainPromise, 0); return; } if (future.cause() instanceof RedisAskException) { RedisAskException ex = (RedisAskException)future.cause(); - connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); - async(readOnlyMode, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), messageDecoder, codec, command, params, mainPromise, attempt); + async(readOnlyMode, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), messageDecoder, codec, command, params, mainPromise, 0); return; } if (future.cause() instanceof RedisLoadingException) { - connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); - async(readOnlyMode, source, messageDecoder, codec, command, params, mainPromise, attempt); + async(readOnlyMode, source, messageDecoder, codec, command, params, mainPromise, 0); return; } diff --git a/src/main/java/org/redisson/client/handler/CommandDecoder.java b/src/main/java/org/redisson/client/handler/CommandDecoder.java index ac11e6376..7bd9e1b18 100644 --- a/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.Map; import org.redisson.client.RedisAskException; -import org.redisson.client.RedisConnection; import org.redisson.client.RedisException; import org.redisson.client.RedisLoadingException; import org.redisson.client.RedisMovedException; diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index d18197022..f8b50fdc3 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -19,6 +19,7 @@ import java.net.InetSocketAddress; import java.util.Collection; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.redisson.MasterSlaveServersConfig; import org.redisson.client.RedisClient; @@ -72,10 +73,10 @@ public interface ConnectionManager { Future connectionWriteOp(NodeSource source, RedisCommand command); FutureListener createReleaseReadListener(NodeSource source, - RedisConnection conn, Timeout timeout); + RedisConnection conn, AtomicReference timeout); FutureListener createReleaseWriteListener(NodeSource source, - RedisConnection conn, Timeout timeout); + RedisConnection conn, AtomicReference timeout); RedisClient createClient(String host, int port, int timeout); diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index db0ee8cd9..e4f2fa56e 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -16,6 +16,7 @@ package org.redisson.connection; import java.net.InetSocketAddress; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -24,6 +25,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.redisson.Config; import org.redisson.MasterSlaveServersConfig; @@ -147,12 +149,17 @@ public class MasterSlaveConnectionManager implements ConnectionManager { protected void init(MasterSlaveServersConfig config) { this.config = config; - int minTimeout = Math.min(config.getRetryInterval(), config.getTimeout()); + int[] timeouts = new int[] {config.getRetryInterval(), config.getTimeout(), config.getSlaveReconnectionTimeout()}; + Arrays.sort(timeouts); + int minTimeout = timeouts[0]; if (minTimeout % 100 != 0) { - timer = new HashedWheelTimer((minTimeout % 100) / 2, TimeUnit.MILLISECONDS); + minTimeout = (minTimeout % 100) / 2; + } else if (minTimeout == 100) { + minTimeout = 50; } else { - timer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS); + minTimeout = 100; } + timer = new HashedWheelTimer(minTimeout, TimeUnit.MILLISECONDS); initEntry(config); } @@ -195,7 +202,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public FutureListener createReleaseWriteListener(final NodeSource source, - final RedisConnection conn, final Timeout timeout) { + final RedisConnection conn, final AtomicReference timeout) { return new FutureListener() { @Override public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { @@ -210,7 +217,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } shutdownLatch.release(); - timeout.cancel(); + timeout.get().cancel(); releaseWrite(source, conn); } }; @@ -218,7 +225,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public FutureListener createReleaseReadListener(final NodeSource source, - final RedisConnection conn, final Timeout timeout) { + final RedisConnection conn, final AtomicReference timeout) { return new FutureListener() { @Override public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { @@ -233,7 +240,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } shutdownLatch.release(); - timeout.cancel(); + timeout.get().cancel(); releaseRead(source, conn); } }; diff --git a/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 69e44bd3c..2b584b687 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -141,9 +141,14 @@ public class MasterSlaveEntry { // more than one slave available, so master could be removed from slaves connectionManager.slaveDown(this, host, port, FreezeReason.SYSTEM); } + oldMaster.freezeMaster(FreezeReason.MANAGER); connectionManager.shutdownAsync(oldMaster.getClient()); } + public void freeze() { + masterEntry.freezeMaster(FreezeReason.MANAGER); + } + public void shutdownMasterAsync() { if (!active.compareAndSet(true, false)) { return; diff --git a/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/src/main/java/org/redisson/connection/SentinelConnectionManager.java index defdaba71..11a4a0b2e 100755 --- a/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -154,7 +154,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { onSlaveAdded(addr, msg); } if ("+sdown".equals(channel)) { - onSlaveDown(addr, msg); + onNodeDown(addr, msg); } if ("-sdown".equals(channel)) { onSlaveUp(addr, msg); @@ -212,7 +212,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } } - private void onSlaveDown(URI sentinelAddr, String msg) { + private void onNodeDown(URI sentinelAddr, String msg) { String[] parts = msg.split(" "); if (parts.length > 3) { @@ -232,7 +232,13 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { log.info("sentinel: {} has down", addr); } } else if ("master".equals(parts[0])) { - // skip + String ip = parts[2]; + String port = parts[3]; + + MasterSlaveEntry entry = getEntry(singleSlotRange); + entry.freeze(); + String addr = ip + ":" + port; + log.info("master: {} has down", addr); } else { log.warn("onSlaveDown. Invalid message: {} from Sentinel {}:{}", msg, sentinelAddr.getHost(), sentinelAddr.getPort()); } diff --git a/src/main/java/org/redisson/connection/SubscribesConnectionEntry.java b/src/main/java/org/redisson/connection/SubscribesConnectionEntry.java index 136ad4638..56b4a79a1 100644 --- a/src/main/java/org/redisson/connection/SubscribesConnectionEntry.java +++ b/src/main/java/org/redisson/connection/SubscribesConnectionEntry.java @@ -69,6 +69,19 @@ public class SubscribesConnectionEntry extends ConnectionEntry { connectionsCounter.incrementAndGet(); } + public boolean freezeMaster(FreezeReason reason) { + synchronized (this) { + setFreezed(true); + // only RECONNECT freeze reason could be replaced + if (getFreezeReason() == null + || getFreezeReason() == FreezeReason.RECONNECT) { + setFreezeReason(reason); + return true; + } + } + return false; + } + public Future connectPubSub(MasterSlaveServersConfig config) { Future future = super.connectPubSub(config); future.addListener(new FutureListener() { diff --git a/src/main/java/org/redisson/misc/ConnectionPool.java b/src/main/java/org/redisson/misc/ConnectionPool.java index d47db3d1d..55793dac4 100644 --- a/src/main/java/org/redisson/misc/ConnectionPool.java +++ b/src/main/java/org/redisson/misc/ConnectionPool.java @@ -18,6 +18,7 @@ package org.redisson.misc; import java.util.Deque; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -59,6 +60,17 @@ public class ConnectionPool { this.loadBalancer = loadBalancer; this.masterSlaveEntry = masterSlaveEntry; this.connectionManager = connectionManager; + +// Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(new Runnable() { +// +// @Override +// public void run() { +// if (promises.size() > 0) { +// System.out.println("promises " + promises.size()); +// } +// +// } +// }, 1, 1, TimeUnit.SECONDS); } public void add(final SubscribesConnectionEntry entry) { @@ -215,19 +227,13 @@ public class ConnectionPool { } } - promises.add(promise); -// promise.tryFailure(cause); +// promises.add(promise); + promise.tryFailure(cause); } private void freezeMaster(SubscribesConnectionEntry entry) { - synchronized (entry) { - if (!entry.isFreezed()) { - entry.setFreezed(true); - if (entry.getFreezeReason() == null) { - entry.setFreezeReason(FreezeReason.RECONNECT); - } - scheduleCheck(entry); - } + if (entry.freezeMaster(FreezeReason.RECONNECT)) { + scheduleCheck(entry); } } @@ -248,9 +254,9 @@ public class ConnectionPool { releaseConnection(entry); - promises.add(promise); -// RedisConnectionException cause = new RedisConnectionException(conn + " is not active!"); -// promise.tryFailure(cause); +// promises.add(promise); + RedisConnectionException cause = new RedisConnectionException(conn + " is not active!"); + promise.tryFailure(cause); } private void scheduleCheck(final SubscribesConnectionEntry entry) {