diff --git a/src/main/java/org/redisson/CommandBatchExecutorService.java b/src/main/java/org/redisson/CommandBatchExecutorService.java index 7a952b933..2aeacd31e 100644 --- a/src/main/java/org/redisson/CommandBatchExecutorService.java +++ b/src/main/java/org/redisson/CommandBatchExecutorService.java @@ -16,6 +16,7 @@ package org.redisson; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Queue; @@ -205,38 +206,51 @@ public class CommandBatchExecutorService extends CommandExecutorService { final Promise attemptPromise = connectionManager.newPromise(); final AtomicReference ex = new AtomicReference(); + final Future connectionFuture; + if (entry.isReadOnlyMode()) { + connectionFuture = connectionManager.connectionReadOp(source, null); + } else { + connectionFuture = connectionManager.connectionWriteOp(source, null); + } + final TimerTask retryTimerTask = new TimerTask() { @Override public void run(Timeout timeout) throws Exception { if (attemptPromise.isDone()) { return; } + + connectionFuture.cancel(false); + if (attempt == connectionManager.getConfig().getRetryAttempts()) { attemptPromise.setFailure(ex.get()); return; } - attemptPromise.cancel(true); + if (!attemptPromise.cancel(false)) { + return; + } int count = attempt + 1; execute(entry, source, mainPromise, slots, count); } }; - Future connectionFuture; - if (entry.isReadOnlyMode()) { - connectionFuture = connectionManager.connectionReadOp(source, null); - } else { - connectionFuture = connectionManager.connectionWriteOp(source, null); - } + ex.set(new RedisTimeoutException("Batch command execution timeout")); + final Timeout timeout = connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS); connectionFuture.addListener(new FutureListener() { @Override public void operationComplete(Future connFuture) throws Exception { - if (attemptPromise.isCancelled()) { + if (attemptPromise.isDone() || connFuture.isCancelled()) { return; } if (!connFuture.isSuccess()) { - ex.set((RedisException)connFuture.cause()); + timeout.cancel(); + if (!connectionManager.getShutdownLatch().acquire()) { + return; + } + + ex.set(convertException(connFuture)); connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); return; } @@ -247,16 +261,22 @@ public class CommandBatchExecutorService extends CommandExecutorService { for (CommandEntry c : entry.getCommands()) { list.add(c.getCommand()); } - ChannelFuture future = connection.send(new CommandsData(attemptPromise, list)); + ChannelFuture writeFuture = connection.send(new CommandsData(attemptPromise, list)); - ex.set(new RedisTimeoutException()); - final Timeout timeout = connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS); - future.addListener(new ChannelFutureListener() { + writeFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { + if (attemptPromise.isDone() || future.isCancelled()) { + return; + } + if (!future.isSuccess()) { timeout.cancel(); + if (!connectionManager.getShutdownLatch().acquire()) { + return; + } + ex.set(new WriteRedisConnectionException("Can't write commands batch to channel: " + future.channel(), future.cause())); connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); } @@ -274,21 +294,34 @@ public class CommandBatchExecutorService extends CommandExecutorService { attemptPromise.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { + timeout.cancel(); if (future.isCancelled()) { return; } if (future.cause() instanceof RedisMovedException) { + if (!connectionManager.getShutdownLatch().acquire()) { + return; + } + RedisMovedException ex = (RedisMovedException)future.cause(); execute(entry, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), mainPromise, slots, attempt); return; } if (future.cause() instanceof RedisAskException) { + if (!connectionManager.getShutdownLatch().acquire()) { + return; + } + RedisAskException ex = (RedisAskException)future.cause(); execute(entry, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), mainPromise, slots, attempt); return; } if (future.cause() instanceof RedisLoadingException) { + if (!connectionManager.getShutdownLatch().acquire()) { + return; + } + execute(entry, source, mainPromise, slots, attempt); return; } diff --git a/src/main/java/org/redisson/CommandExecutorService.java b/src/main/java/org/redisson/CommandExecutorService.java index 7a728b4cf..2a50e237f 100644 --- a/src/main/java/org/redisson/CommandExecutorService.java +++ b/src/main/java/org/redisson/CommandExecutorService.java @@ -192,7 +192,7 @@ public class CommandExecutorService implements CommandExecutor { throw convertException(future); } - private RedisException convertException(Future future) { + protected RedisException convertException(Future future) { return future.cause() instanceof RedisException ? (RedisException) future.cause() : new RedisException("Unexpected exception while processing command", future.cause()); @@ -422,12 +422,22 @@ public class CommandExecutorService implements CommandExecutor { final Promise attemptPromise = connectionManager.newPromise(); final AtomicReference ex = new AtomicReference(); + final Future connectionFuture; + if (readOnlyMode) { + connectionFuture = connectionManager.connectionReadOp(source, command); + } else { + connectionFuture = connectionManager.connectionWriteOp(source, command); + } + final TimerTask retryTimerTask = new TimerTask() { @Override public void run(Timeout timeout) throws Exception { if (attemptPromise.isDone()) { return; } + + connectionFuture.cancel(false); + if (attempt == connectionManager.getConfig().getRetryAttempts()) { attemptPromise.tryFailure(ex.get()); return; @@ -444,17 +454,10 @@ public class CommandExecutorService implements CommandExecutor { ex.set(new RedisTimeoutException("Command execution timeout for command: " + command + " with params " + Arrays.toString(params))); final Timeout timeout = connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS); - final Future connectionFuture; - if (readOnlyMode) { - connectionFuture = connectionManager.connectionReadOp(source, command); - } else { - connectionFuture = connectionManager.connectionWriteOp(source, command); - } - connectionFuture.addListener(new FutureListener() { @Override public void operationComplete(Future connFuture) throws Exception { - if (attemptPromise.isCancelled() || connFuture.isCancelled()) { + if (attemptPromise.isDone() || connFuture.isCancelled()) { return; } if (!connFuture.isSuccess()) { @@ -469,23 +472,23 @@ public class CommandExecutorService implements CommandExecutor { RedisConnection connection = connFuture.getNow(); - ChannelFuture future = null; + ChannelFuture writeFuture = null; if (source.getRedirect() == Redirect.ASK) { List> list = new ArrayList>(2); Promise promise = connectionManager.newPromise(); list.add(new CommandData(promise, codec, RedisCommands.ASKING, new Object[] {})); list.add(new CommandData(attemptPromise, messageDecoder, codec, command, params)); Promise main = connectionManager.newPromise(); - future = connection.send(new CommandsData(main, list)); + writeFuture = connection.send(new CommandsData(main, list)); } else { log.debug("getting connection for command {} from slot {} using node {}", command, source, connection.getRedisClient().getAddr()); - future = connection.send(new CommandData(attemptPromise, messageDecoder, codec, command, params)); + writeFuture = connection.send(new CommandData(attemptPromise, messageDecoder, codec, command, params)); } - future.addListener(new ChannelFutureListener() { + writeFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { - if (attemptPromise.isCancelled() || future.isCancelled()) { + if (attemptPromise.isDone() || future.isCancelled()) { return; } if (!future.isSuccess()) { diff --git a/src/main/java/org/redisson/connection/BaseLoadBalancer.java b/src/main/java/org/redisson/connection/BaseLoadBalancer.java index 4250e8dec..7b5532fb7 100644 --- a/src/main/java/org/redisson/connection/BaseLoadBalancer.java +++ b/src/main/java/org/redisson/connection/BaseLoadBalancer.java @@ -68,34 +68,36 @@ abstract class BaseLoadBalancer implements LoadBalancer { return count; } - public synchronized boolean unfreeze(String host, int port, FreezeReason freezeReason) { + public boolean unfreeze(String host, int port, FreezeReason freezeReason) { InetSocketAddress addr = new InetSocketAddress(host, port); - for (SubscribesConnectionEntry connectionEntry : addr2Entry.values()) { - if (!connectionEntry.getClient().getAddr().equals(addr)) { - continue; - } - if (freezeReason == FreezeReason.RECONNECT - && connectionEntry.getFreezeReason() == FreezeReason.RECONNECT) { - connectionEntry.setFreezed(false); - return true; + SubscribesConnectionEntry entry = addr2Entry.get(addr); + if (entry == null) { + throw new IllegalStateException("Can't find " + addr + " in slaves!"); + } + + synchronized (entry) { + if (!entry.isFreezed()) { + return false; } - if (freezeReason == FreezeReason.MANAGER) { - connectionEntry.setFreezed(false); + if ((freezeReason == FreezeReason.RECONNECT + && entry.getFreezeReason() == FreezeReason.RECONNECT) + || freezeReason != FreezeReason.RECONNECT) { + entry.setFreezed(false); + entry.setFreezeReason(null); return true; } - return false; } - throw new IllegalStateException("Can't find " + addr + " in slaves!"); + return false; } - public synchronized Collection freeze(String host, int port, FreezeReason freezeReason) { + public Collection freeze(String host, int port, FreezeReason freezeReason) { InetSocketAddress addr = new InetSocketAddress(host, port); - for (SubscribesConnectionEntry connectionEntry : addr2Entry.values()) { - if (connectionEntry.isFreezed() - || !connectionEntry.getClient().getAddr().equals(addr)) { - continue; - } + SubscribesConnectionEntry connectionEntry = addr2Entry.get(addr); + if (connectionEntry == null) { + return Collections.emptyList(); + } + synchronized (connectionEntry) { log.debug("{} freezed", addr); connectionEntry.setFreezed(true); // only RECONNECT freeze reason could be replaced @@ -103,32 +105,29 @@ abstract class BaseLoadBalancer implements LoadBalancer { || connectionEntry.getFreezeReason() == FreezeReason.RECONNECT) { connectionEntry.setFreezeReason(freezeReason); } + } - // close all connections - while (true) { - RedisConnection connection = connectionEntry.pollConnection(); - if (connection == null) { - break; - } - connection.closeAsync(); + // close all connections + while (true) { + RedisConnection connection = connectionEntry.pollConnection(); + if (connection == null) { + break; } + connection.closeAsync(); + } - // close all pub/sub connections - while (true) { - RedisPubSubConnection connection = connectionEntry.pollFreeSubscribeConnection(); - if (connection == null) { - break; - } - connection.closeAsync(); + // close all pub/sub connections + while (true) { + RedisPubSubConnection connection = connectionEntry.pollFreeSubscribeConnection(); + if (connection == null) { + break; } - - - List list = new ArrayList(connectionEntry.getAllSubscribeConnections()); - connectionEntry.getAllSubscribeConnections().clear(); - return list; + connection.closeAsync(); } - return Collections.emptyList(); + List list = new ArrayList(connectionEntry.getAllSubscribeConnections()); + connectionEntry.getAllSubscribeConnections().clear(); + return list; } public Future nextPubSubConnection() { diff --git a/src/main/java/org/redisson/connection/ConnectionEntry.java b/src/main/java/org/redisson/connection/ConnectionEntry.java index f7acd33f4..ce8c16d2f 100644 --- a/src/main/java/org/redisson/connection/ConnectionEntry.java +++ b/src/main/java/org/redisson/connection/ConnectionEntry.java @@ -35,7 +35,7 @@ public class ConnectionEntry { final Logger log = LoggerFactory.getLogger(getClass()); - public enum FreezeReason {MANAGER, RECONNECT} + public enum FreezeReason {MANAGER, RECONNECT, SYSTEM} private volatile boolean freezed; private FreezeReason freezeReason; @@ -127,6 +127,7 @@ public class ConnectionEntry { @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { + connectionFuture.tryFailure(future.cause()); return; } RedisConnection conn = future.getNow(); @@ -161,6 +162,7 @@ public class ConnectionEntry { @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { + connectionFuture.tryFailure(future.cause()); return; } RedisPubSubConnection conn = future.getNow(); diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 0fc529cbb..612b40e7a 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -176,6 +176,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return new FutureListener() { @Override public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { + if (future.isCancelled()) { + return; + } + if (!future.isSuccess()) { conn.incFailAttempt(); } else { @@ -195,6 +199,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return new FutureListener() { @Override public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { + if (future.isCancelled()) { + return; + } + if (!future.isSuccess()) { conn.incFailAttempt(); } else { @@ -492,6 +500,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager { public void slaveDown(MasterSlaveEntry entry, String host, int port, FreezeReason freezeReason) { Collection allPubSubConnections = entry.slaveDown(host, port, freezeReason); + if (allPubSubConnections.isEmpty()) { + return; + } // reattach listeners to other channels for (Entry mapEntry : name2PubSubConnection.entrySet()) { diff --git a/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 06ce6ec80..69e44bd3c 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -90,7 +90,7 @@ public class MasterSlaveEntry { // add master as slave if no more slaves available if (slaveBalancer.getAvailableClients() == 0) { InetSocketAddress addr = masterEntry.getClient().getAddr(); - slaveUp(addr.getHostName(), addr.getPort(), FreezeReason.MANAGER); + slaveUp(addr.getHostName(), addr.getPort(), FreezeReason.SYSTEM); log.info("master {}:{} used as slave", addr.getHostName(), addr.getPort()); } return conns; @@ -105,7 +105,10 @@ public class MasterSlaveEntry { SubscribesConnectionEntry entry = new SubscribesConnectionEntry(client, this.config.getSlaveConnectionPoolSize(), this.config.getSlaveSubscriptionConnectionPoolSize(), connectListener, mode); - entry.setFreezed(freezed); + if (freezed) { + entry.setFreezed(freezed); + entry.setFreezeReason(FreezeReason.SYSTEM); + } slaveBalancer.add(entry); } @@ -118,8 +121,9 @@ public class MasterSlaveEntry { return; } InetSocketAddress addr = masterEntry.getClient().getAddr(); + // exclude master from slaves if (!addr.getHostName().equals(host) || port != addr.getPort()) { - connectionManager.slaveDown(this, addr.getHostName(), addr.getPort(), FreezeReason.MANAGER); + connectionManager.slaveDown(this, addr.getHostName(), addr.getPort(), FreezeReason.SYSTEM); } } @@ -134,8 +138,8 @@ public class MasterSlaveEntry { setupMasterEntry(host, port); writeConnectionHolder.remove(oldMaster); if (slaveBalancer.getAvailableClients() > 1) { - // more than one slave avaliable, so master could be removed from slaves - connectionManager.slaveDown(this, host, port, FreezeReason.MANAGER); + // more than one slave available, so master could be removed from slaves + connectionManager.slaveDown(this, host, port, FreezeReason.SYSTEM); } connectionManager.shutdownAsync(oldMaster.getClient()); } diff --git a/src/main/java/org/redisson/misc/ConnectionPool.java b/src/main/java/org/redisson/misc/ConnectionPool.java index 02bef6fce..e16340699 100644 --- a/src/main/java/org/redisson/misc/ConnectionPool.java +++ b/src/main/java/org/redisson/misc/ConnectionPool.java @@ -15,9 +15,10 @@ */ package org.redisson.misc; +import java.util.Deque; import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -42,7 +43,7 @@ public class ConnectionPool { final List entries = new CopyOnWriteArrayList(); - final ConcurrentLinkedQueue> promises = new ConcurrentLinkedQueue>(); + final Deque> promises = new LinkedBlockingDeque>(); final ConnectionManager connectionManager; @@ -61,18 +62,26 @@ public class ConnectionPool { } public void add(final SubscribesConnectionEntry entry) { - // is it a master connection pool? + initConnections(entry, new Runnable() { + @Override + public void run() { + entries.add(entry); + handleQueue(entry, true); + } + }, true); + } + + private void initConnections(final SubscribesConnectionEntry entry, final Runnable runnable, boolean checkFreezed) { int minimumIdleSize = getMinimumIdleSize(entry); if (minimumIdleSize == 0) { - entries.add(entry); - handleQueue(entry); + runnable.run(); return; } final AtomicInteger completedConnections = new AtomicInteger(minimumIdleSize); for (int i = 0; i < minimumIdleSize; i++) { - if (entry.isFreezed() || !tryAcquireConnection(entry)) { + if ((checkFreezed && entry.isFreezed()) || !tryAcquireConnection(entry)) { continue; } @@ -88,8 +97,7 @@ public class ConnectionPool { releaseConnection(entry); if (completedConnections.decrementAndGet() == 0) { - entries.add(entry); - handleQueue(entry); + runnable.run(); } } }); @@ -98,6 +106,7 @@ public class ConnectionPool { protected int getMinimumIdleSize(SubscribesConnectionEntry entry) { int minimumIdleSize = config.getSlaveConnectionMinimumIdleSize(); + // is it a master connection pool? if (entry.getNodeType() == NodeType.MASTER && loadBalancer == null) { minimumIdleSize = config.getMasterConnectionMinimumIdleSize(); } @@ -129,7 +138,7 @@ public class ConnectionPool { } public Future get(SubscribesConnectionEntry entry) { - if ((entry.getNodeType() == NodeType.MASTER || !entry.isFreezed()) + if (((entry.getNodeType() == NodeType.MASTER && entry.getFreezeReason() == FreezeReason.SYSTEM) || !entry.isFreezed()) && tryAcquireConnection(entry)) { Promise promise = connectionManager.newPromise(); connect(entry, promise); @@ -196,44 +205,59 @@ public class ConnectionPool { } private void promiseFailure(SubscribesConnectionEntry entry, Promise promise, Throwable cause) { - if (entry.incFailedAttempts() == config.getSlaveFailedAttempts() - && entry.getNodeType() == NodeType.SLAVE) { - connectionManager.slaveDown(masterSlaveEntry, entry.getClient().getAddr().getHostName(), - entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT); - scheduleCheck(entry); + if (entry.incFailedAttempts() == config.getSlaveFailedAttempts()) { + if (entry.getNodeType() == NodeType.SLAVE) { + connectionManager.slaveDown(masterSlaveEntry, entry.getClient().getAddr().getHostName(), + entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT); + scheduleCheck(entry); + } else { + freezeMaster(entry); + } } - promise.setFailure(cause); + promises.addFirst(promise); +// promise.tryFailure(cause); + } + + private void freezeMaster(SubscribesConnectionEntry entry) { + synchronized (entry) { + if (!entry.isFreezed()) { + entry.setFreezed(true); + if (entry.getFreezeReason() == null) { + entry.setFreezeReason(FreezeReason.RECONNECT); + } + scheduleCheck(entry); + } + } } private void promiseFailure(SubscribesConnectionEntry entry, Promise promise, T conn) { int attempts = entry.incFailedAttempts(); - if (entry.getNodeType() == NodeType.SLAVE) { - if (attempts == config.getSlaveFailedAttempts()) { + if (attempts == config.getSlaveFailedAttempts()) { + if (entry.getNodeType() == NodeType.SLAVE) { connectionManager.slaveDown(masterSlaveEntry, entry.getClient().getAddr().getHostName(), entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT); scheduleCheck(entry); - } else if (attempts < config.getSlaveFailedAttempts()) { - releaseConnection(entry, conn); } else { - conn.closeAsync(); + freezeMaster(entry); } - } else { + } else if (attempts < config.getSlaveFailedAttempts()) { releaseConnection(entry, conn); } releaseConnection(entry); - RedisConnectionException cause = new RedisConnectionException(conn + " is not active!"); - promise.setFailure(cause); + promises.addFirst(promise); +// RedisConnectionException cause = new RedisConnectionException(conn + " is not active!"); +// promise.tryFailure(cause); } private void scheduleCheck(final SubscribesConnectionEntry entry) { connectionManager.getTimer().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { - if (entry.getFreezeReason() == FreezeReason.MANAGER + if (entry.getFreezeReason() != FreezeReason.RECONNECT || !entry.isFreezed()) { return; } @@ -242,7 +266,7 @@ public class ConnectionPool { connectionFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { - if (entry.getFreezeReason() == FreezeReason.MANAGER + if (entry.getFreezeReason() != FreezeReason.RECONNECT || !entry.isFreezed()) { return; } @@ -258,14 +282,32 @@ public class ConnectionPool { @Override public void operationComplete(Future future) throws Exception { try { - if (entry.getFreezeReason() == FreezeReason.MANAGER + if (entry.getFreezeReason() != FreezeReason.RECONNECT || !entry.isFreezed()) { return; } if (future.isSuccess() && "PONG".equals(future.getNow())) { entry.resetFailedAttempts(); - masterSlaveEntry.slaveUp(entry.getClient().getAddr().getHostName(), entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT); + initConnections(entry, new Runnable() { + @Override + public void run() { + if (entry.getNodeType() == NodeType.SLAVE) { + handleQueue(entry, false); + masterSlaveEntry.slaveUp(entry.getClient().getAddr().getHostName(), entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT); + } else { + synchronized (entry) { + if (entry.getFreezeReason() == FreezeReason.RECONNECT) { + handleQueue(entry, false); + + entry.setFreezed(false); + entry.setFreezeReason(null); + } + } + } + } + }, false); + } else { scheduleCheck(entry); } @@ -299,17 +341,28 @@ public class ConnectionPool { protected void releaseConnection(SubscribesConnectionEntry entry) { entry.releaseConnection(); - handleQueue(entry); + handleQueue(entry, true); } - private void handleQueue(SubscribesConnectionEntry entry) { - Promise promise = promises.poll(); - if (promise != null) { - if (!entry.isFreezed() && tryAcquireConnection(entry)) { - connect(entry, promise); + private void handleQueue(SubscribesConnectionEntry entry, boolean checkFreezed) { + while (true) { + if (checkFreezed && entry.isFreezed()) { + return; + } + Promise promise = promises.poll(); + if (promise == null) { + return; + } + if (promise.isCancelled()) { + continue; + } + + if (!tryAcquireConnection(entry)) { + promises.addFirst(promise); } else { - promises.add(promise); + connect(entry, promise); } + return; } }