connectAsync bug fixed & disconnected master case handling. #262

pull/297/head
Nikita 9 years ago
parent b78c9df54d
commit 3614ca9c63

@ -16,6 +16,7 @@
package org.redisson; package org.redisson;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Queue; import java.util.Queue;
@ -205,38 +206,51 @@ public class CommandBatchExecutorService extends CommandExecutorService {
final Promise<Void> attemptPromise = connectionManager.newPromise(); final Promise<Void> attemptPromise = connectionManager.newPromise();
final AtomicReference<RedisException> ex = new AtomicReference<RedisException>(); final AtomicReference<RedisException> ex = new AtomicReference<RedisException>();
final Future<RedisConnection> connectionFuture;
if (entry.isReadOnlyMode()) {
connectionFuture = connectionManager.connectionReadOp(source, null);
} else {
connectionFuture = connectionManager.connectionWriteOp(source, null);
}
final TimerTask retryTimerTask = new TimerTask() { final TimerTask retryTimerTask = new TimerTask() {
@Override @Override
public void run(Timeout timeout) throws Exception { public void run(Timeout timeout) throws Exception {
if (attemptPromise.isDone()) { if (attemptPromise.isDone()) {
return; return;
} }
connectionFuture.cancel(false);
if (attempt == connectionManager.getConfig().getRetryAttempts()) { if (attempt == connectionManager.getConfig().getRetryAttempts()) {
attemptPromise.setFailure(ex.get()); attemptPromise.setFailure(ex.get());
return; return;
} }
attemptPromise.cancel(true); if (!attemptPromise.cancel(false)) {
return;
}
int count = attempt + 1; int count = attempt + 1;
execute(entry, source, mainPromise, slots, count); execute(entry, source, mainPromise, slots, count);
} }
}; };
Future<RedisConnection> connectionFuture; ex.set(new RedisTimeoutException("Batch command execution timeout"));
if (entry.isReadOnlyMode()) { final Timeout timeout = connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);
connectionFuture = connectionManager.connectionReadOp(source, null);
} else {
connectionFuture = connectionManager.connectionWriteOp(source, null);
}
connectionFuture.addListener(new FutureListener<RedisConnection>() { connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override @Override
public void operationComplete(Future<RedisConnection> connFuture) throws Exception { public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
if (attemptPromise.isCancelled()) { if (attemptPromise.isDone() || connFuture.isCancelled()) {
return; return;
} }
if (!connFuture.isSuccess()) { if (!connFuture.isSuccess()) {
ex.set((RedisException)connFuture.cause()); timeout.cancel();
if (!connectionManager.getShutdownLatch().acquire()) {
return;
}
ex.set(convertException(connFuture));
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
return; return;
} }
@ -247,16 +261,22 @@ public class CommandBatchExecutorService extends CommandExecutorService {
for (CommandEntry c : entry.getCommands()) { for (CommandEntry c : entry.getCommands()) {
list.add(c.getCommand()); list.add(c.getCommand());
} }
ChannelFuture future = connection.send(new CommandsData(attemptPromise, list)); ChannelFuture writeFuture = connection.send(new CommandsData(attemptPromise, list));
ex.set(new RedisTimeoutException());
final Timeout timeout = connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);
future.addListener(new ChannelFutureListener() { writeFuture.addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
if (attemptPromise.isDone() || future.isCancelled()) {
return;
}
if (!future.isSuccess()) { if (!future.isSuccess()) {
timeout.cancel(); timeout.cancel();
if (!connectionManager.getShutdownLatch().acquire()) {
return;
}
ex.set(new WriteRedisConnectionException("Can't write commands batch to channel: " + future.channel(), future.cause())); ex.set(new WriteRedisConnectionException("Can't write commands batch to channel: " + future.channel(), future.cause()));
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
} }
@ -274,21 +294,34 @@ public class CommandBatchExecutorService extends CommandExecutorService {
attemptPromise.addListener(new FutureListener<Void>() { attemptPromise.addListener(new FutureListener<Void>() {
@Override @Override
public void operationComplete(Future<Void> future) throws Exception { public void operationComplete(Future<Void> future) throws Exception {
timeout.cancel();
if (future.isCancelled()) { if (future.isCancelled()) {
return; return;
} }
if (future.cause() instanceof RedisMovedException) { if (future.cause() instanceof RedisMovedException) {
if (!connectionManager.getShutdownLatch().acquire()) {
return;
}
RedisMovedException ex = (RedisMovedException)future.cause(); RedisMovedException ex = (RedisMovedException)future.cause();
execute(entry, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), mainPromise, slots, attempt); execute(entry, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), mainPromise, slots, attempt);
return; return;
} }
if (future.cause() instanceof RedisAskException) { if (future.cause() instanceof RedisAskException) {
if (!connectionManager.getShutdownLatch().acquire()) {
return;
}
RedisAskException ex = (RedisAskException)future.cause(); RedisAskException ex = (RedisAskException)future.cause();
execute(entry, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), mainPromise, slots, attempt); execute(entry, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), mainPromise, slots, attempt);
return; return;
} }
if (future.cause() instanceof RedisLoadingException) { if (future.cause() instanceof RedisLoadingException) {
if (!connectionManager.getShutdownLatch().acquire()) {
return;
}
execute(entry, source, mainPromise, slots, attempt); execute(entry, source, mainPromise, slots, attempt);
return; return;
} }

@ -192,7 +192,7 @@ public class CommandExecutorService implements CommandExecutor {
throw convertException(future); throw convertException(future);
} }
private <V> RedisException convertException(Future<V> future) { protected <V> RedisException convertException(Future<V> future) {
return future.cause() instanceof RedisException ? return future.cause() instanceof RedisException ?
(RedisException) future.cause() : (RedisException) future.cause() :
new RedisException("Unexpected exception while processing command", future.cause()); new RedisException("Unexpected exception while processing command", future.cause());
@ -422,12 +422,22 @@ public class CommandExecutorService implements CommandExecutor {
final Promise<R> attemptPromise = connectionManager.newPromise(); final Promise<R> attemptPromise = connectionManager.newPromise();
final AtomicReference<RedisException> ex = new AtomicReference<RedisException>(); final AtomicReference<RedisException> ex = new AtomicReference<RedisException>();
final Future<RedisConnection> connectionFuture;
if (readOnlyMode) {
connectionFuture = connectionManager.connectionReadOp(source, command);
} else {
connectionFuture = connectionManager.connectionWriteOp(source, command);
}
final TimerTask retryTimerTask = new TimerTask() { final TimerTask retryTimerTask = new TimerTask() {
@Override @Override
public void run(Timeout timeout) throws Exception { public void run(Timeout timeout) throws Exception {
if (attemptPromise.isDone()) { if (attemptPromise.isDone()) {
return; return;
} }
connectionFuture.cancel(false);
if (attempt == connectionManager.getConfig().getRetryAttempts()) { if (attempt == connectionManager.getConfig().getRetryAttempts()) {
attemptPromise.tryFailure(ex.get()); attemptPromise.tryFailure(ex.get());
return; return;
@ -444,17 +454,10 @@ public class CommandExecutorService implements CommandExecutor {
ex.set(new RedisTimeoutException("Command execution timeout for command: " + command + " with params " + Arrays.toString(params))); ex.set(new RedisTimeoutException("Command execution timeout for command: " + command + " with params " + Arrays.toString(params)));
final Timeout timeout = connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS); final Timeout timeout = connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);
final Future<RedisConnection> connectionFuture;
if (readOnlyMode) {
connectionFuture = connectionManager.connectionReadOp(source, command);
} else {
connectionFuture = connectionManager.connectionWriteOp(source, command);
}
connectionFuture.addListener(new FutureListener<RedisConnection>() { connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override @Override
public void operationComplete(Future<RedisConnection> connFuture) throws Exception { public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
if (attemptPromise.isCancelled() || connFuture.isCancelled()) { if (attemptPromise.isDone() || connFuture.isCancelled()) {
return; return;
} }
if (!connFuture.isSuccess()) { if (!connFuture.isSuccess()) {
@ -469,23 +472,23 @@ public class CommandExecutorService implements CommandExecutor {
RedisConnection connection = connFuture.getNow(); RedisConnection connection = connFuture.getNow();
ChannelFuture future = null; ChannelFuture writeFuture = null;
if (source.getRedirect() == Redirect.ASK) { if (source.getRedirect() == Redirect.ASK) {
List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(2); List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(2);
Promise<Void> promise = connectionManager.newPromise(); Promise<Void> promise = connectionManager.newPromise();
list.add(new CommandData<Void, Void>(promise, codec, RedisCommands.ASKING, new Object[] {})); list.add(new CommandData<Void, Void>(promise, codec, RedisCommands.ASKING, new Object[] {}));
list.add(new CommandData<V, R>(attemptPromise, messageDecoder, codec, command, params)); list.add(new CommandData<V, R>(attemptPromise, messageDecoder, codec, command, params));
Promise<Void> main = connectionManager.newPromise(); Promise<Void> main = connectionManager.newPromise();
future = connection.send(new CommandsData(main, list)); writeFuture = connection.send(new CommandsData(main, list));
} else { } else {
log.debug("getting connection for command {} from slot {} using node {}", command, source, connection.getRedisClient().getAddr()); log.debug("getting connection for command {} from slot {} using node {}", command, source, connection.getRedisClient().getAddr());
future = connection.send(new CommandData<V, R>(attemptPromise, messageDecoder, codec, command, params)); writeFuture = connection.send(new CommandData<V, R>(attemptPromise, messageDecoder, codec, command, params));
} }
future.addListener(new ChannelFutureListener() { writeFuture.addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
if (attemptPromise.isCancelled() || future.isCancelled()) { if (attemptPromise.isDone() || future.isCancelled()) {
return; return;
} }
if (!future.isSuccess()) { if (!future.isSuccess()) {

@ -68,34 +68,36 @@ abstract class BaseLoadBalancer implements LoadBalancer {
return count; return count;
} }
public synchronized boolean unfreeze(String host, int port, FreezeReason freezeReason) { public boolean unfreeze(String host, int port, FreezeReason freezeReason) {
InetSocketAddress addr = new InetSocketAddress(host, port); InetSocketAddress addr = new InetSocketAddress(host, port);
for (SubscribesConnectionEntry connectionEntry : addr2Entry.values()) { SubscribesConnectionEntry entry = addr2Entry.get(addr);
if (!connectionEntry.getClient().getAddr().equals(addr)) { if (entry == null) {
continue; throw new IllegalStateException("Can't find " + addr + " in slaves!");
} }
if (freezeReason == FreezeReason.RECONNECT
&& connectionEntry.getFreezeReason() == FreezeReason.RECONNECT) { synchronized (entry) {
connectionEntry.setFreezed(false); if (!entry.isFreezed()) {
return true; return false;
} }
if (freezeReason == FreezeReason.MANAGER) { if ((freezeReason == FreezeReason.RECONNECT
connectionEntry.setFreezed(false); && entry.getFreezeReason() == FreezeReason.RECONNECT)
|| freezeReason != FreezeReason.RECONNECT) {
entry.setFreezed(false);
entry.setFreezeReason(null);
return true; return true;
} }
return false;
} }
throw new IllegalStateException("Can't find " + addr + " in slaves!"); return false;
} }
public synchronized Collection<RedisPubSubConnection> freeze(String host, int port, FreezeReason freezeReason) { public Collection<RedisPubSubConnection> freeze(String host, int port, FreezeReason freezeReason) {
InetSocketAddress addr = new InetSocketAddress(host, port); InetSocketAddress addr = new InetSocketAddress(host, port);
for (SubscribesConnectionEntry connectionEntry : addr2Entry.values()) { SubscribesConnectionEntry connectionEntry = addr2Entry.get(addr);
if (connectionEntry.isFreezed() if (connectionEntry == null) {
|| !connectionEntry.getClient().getAddr().equals(addr)) { return Collections.emptyList();
continue; }
}
synchronized (connectionEntry) {
log.debug("{} freezed", addr); log.debug("{} freezed", addr);
connectionEntry.setFreezed(true); connectionEntry.setFreezed(true);
// only RECONNECT freeze reason could be replaced // only RECONNECT freeze reason could be replaced
@ -103,32 +105,29 @@ abstract class BaseLoadBalancer implements LoadBalancer {
|| connectionEntry.getFreezeReason() == FreezeReason.RECONNECT) { || connectionEntry.getFreezeReason() == FreezeReason.RECONNECT) {
connectionEntry.setFreezeReason(freezeReason); connectionEntry.setFreezeReason(freezeReason);
} }
}
// close all connections // close all connections
while (true) { while (true) {
RedisConnection connection = connectionEntry.pollConnection(); RedisConnection connection = connectionEntry.pollConnection();
if (connection == null) { if (connection == null) {
break; break;
}
connection.closeAsync();
} }
connection.closeAsync();
}
// close all pub/sub connections // close all pub/sub connections
while (true) { while (true) {
RedisPubSubConnection connection = connectionEntry.pollFreeSubscribeConnection(); RedisPubSubConnection connection = connectionEntry.pollFreeSubscribeConnection();
if (connection == null) { if (connection == null) {
break; break;
}
connection.closeAsync();
} }
connection.closeAsync();
List<RedisPubSubConnection> list = new ArrayList<RedisPubSubConnection>(connectionEntry.getAllSubscribeConnections());
connectionEntry.getAllSubscribeConnections().clear();
return list;
} }
return Collections.emptyList(); List<RedisPubSubConnection> list = new ArrayList<RedisPubSubConnection>(connectionEntry.getAllSubscribeConnections());
connectionEntry.getAllSubscribeConnections().clear();
return list;
} }
public Future<RedisPubSubConnection> nextPubSubConnection() { public Future<RedisPubSubConnection> nextPubSubConnection() {

@ -35,7 +35,7 @@ public class ConnectionEntry {
final Logger log = LoggerFactory.getLogger(getClass()); final Logger log = LoggerFactory.getLogger(getClass());
public enum FreezeReason {MANAGER, RECONNECT} public enum FreezeReason {MANAGER, RECONNECT, SYSTEM}
private volatile boolean freezed; private volatile boolean freezed;
private FreezeReason freezeReason; private FreezeReason freezeReason;
@ -127,6 +127,7 @@ public class ConnectionEntry {
@Override @Override
public void operationComplete(Future<RedisConnection> future) throws Exception { public void operationComplete(Future<RedisConnection> future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
connectionFuture.tryFailure(future.cause());
return; return;
} }
RedisConnection conn = future.getNow(); RedisConnection conn = future.getNow();
@ -161,6 +162,7 @@ public class ConnectionEntry {
@Override @Override
public void operationComplete(Future<RedisPubSubConnection> future) throws Exception { public void operationComplete(Future<RedisPubSubConnection> future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
connectionFuture.tryFailure(future.cause());
return; return;
} }
RedisPubSubConnection conn = future.getNow(); RedisPubSubConnection conn = future.getNow();

@ -176,6 +176,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return new FutureListener<T>() { return new FutureListener<T>() {
@Override @Override
public void operationComplete(io.netty.util.concurrent.Future<T> future) throws Exception { public void operationComplete(io.netty.util.concurrent.Future<T> future) throws Exception {
if (future.isCancelled()) {
return;
}
if (!future.isSuccess()) { if (!future.isSuccess()) {
conn.incFailAttempt(); conn.incFailAttempt();
} else { } else {
@ -195,6 +199,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return new FutureListener<T>() { return new FutureListener<T>() {
@Override @Override
public void operationComplete(io.netty.util.concurrent.Future<T> future) throws Exception { public void operationComplete(io.netty.util.concurrent.Future<T> future) throws Exception {
if (future.isCancelled()) {
return;
}
if (!future.isSuccess()) { if (!future.isSuccess()) {
conn.incFailAttempt(); conn.incFailAttempt();
} else { } else {
@ -492,6 +500,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
public void slaveDown(MasterSlaveEntry entry, String host, int port, FreezeReason freezeReason) { public void slaveDown(MasterSlaveEntry entry, String host, int port, FreezeReason freezeReason) {
Collection<RedisPubSubConnection> allPubSubConnections = entry.slaveDown(host, port, freezeReason); Collection<RedisPubSubConnection> allPubSubConnections = entry.slaveDown(host, port, freezeReason);
if (allPubSubConnections.isEmpty()) {
return;
}
// reattach listeners to other channels // reattach listeners to other channels
for (Entry<String, PubSubConnectionEntry> mapEntry : name2PubSubConnection.entrySet()) { for (Entry<String, PubSubConnectionEntry> mapEntry : name2PubSubConnection.entrySet()) {

@ -90,7 +90,7 @@ public class MasterSlaveEntry<E extends ConnectionEntry> {
// add master as slave if no more slaves available // add master as slave if no more slaves available
if (slaveBalancer.getAvailableClients() == 0) { if (slaveBalancer.getAvailableClients() == 0) {
InetSocketAddress addr = masterEntry.getClient().getAddr(); InetSocketAddress addr = masterEntry.getClient().getAddr();
slaveUp(addr.getHostName(), addr.getPort(), FreezeReason.MANAGER); slaveUp(addr.getHostName(), addr.getPort(), FreezeReason.SYSTEM);
log.info("master {}:{} used as slave", addr.getHostName(), addr.getPort()); log.info("master {}:{} used as slave", addr.getHostName(), addr.getPort());
} }
return conns; return conns;
@ -105,7 +105,10 @@ public class MasterSlaveEntry<E extends ConnectionEntry> {
SubscribesConnectionEntry entry = new SubscribesConnectionEntry(client, SubscribesConnectionEntry entry = new SubscribesConnectionEntry(client,
this.config.getSlaveConnectionPoolSize(), this.config.getSlaveConnectionPoolSize(),
this.config.getSlaveSubscriptionConnectionPoolSize(), connectListener, mode); this.config.getSlaveSubscriptionConnectionPoolSize(), connectListener, mode);
entry.setFreezed(freezed); if (freezed) {
entry.setFreezed(freezed);
entry.setFreezeReason(FreezeReason.SYSTEM);
}
slaveBalancer.add(entry); slaveBalancer.add(entry);
} }
@ -118,8 +121,9 @@ public class MasterSlaveEntry<E extends ConnectionEntry> {
return; return;
} }
InetSocketAddress addr = masterEntry.getClient().getAddr(); InetSocketAddress addr = masterEntry.getClient().getAddr();
// exclude master from slaves
if (!addr.getHostName().equals(host) || port != addr.getPort()) { if (!addr.getHostName().equals(host) || port != addr.getPort()) {
connectionManager.slaveDown(this, addr.getHostName(), addr.getPort(), FreezeReason.MANAGER); connectionManager.slaveDown(this, addr.getHostName(), addr.getPort(), FreezeReason.SYSTEM);
} }
} }
@ -134,8 +138,8 @@ public class MasterSlaveEntry<E extends ConnectionEntry> {
setupMasterEntry(host, port); setupMasterEntry(host, port);
writeConnectionHolder.remove(oldMaster); writeConnectionHolder.remove(oldMaster);
if (slaveBalancer.getAvailableClients() > 1) { if (slaveBalancer.getAvailableClients() > 1) {
// more than one slave avaliable, so master could be removed from slaves // more than one slave available, so master could be removed from slaves
connectionManager.slaveDown(this, host, port, FreezeReason.MANAGER); connectionManager.slaveDown(this, host, port, FreezeReason.SYSTEM);
} }
connectionManager.shutdownAsync(oldMaster.getClient()); connectionManager.shutdownAsync(oldMaster.getClient());
} }

@ -15,9 +15,10 @@
*/ */
package org.redisson.misc; package org.redisson.misc;
import java.util.Deque;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -42,7 +43,7 @@ public class ConnectionPool<T extends RedisConnection> {
final List<SubscribesConnectionEntry> entries = new CopyOnWriteArrayList<SubscribesConnectionEntry>(); final List<SubscribesConnectionEntry> entries = new CopyOnWriteArrayList<SubscribesConnectionEntry>();
final ConcurrentLinkedQueue<Promise<T>> promises = new ConcurrentLinkedQueue<Promise<T>>(); final Deque<Promise<T>> promises = new LinkedBlockingDeque<Promise<T>>();
final ConnectionManager connectionManager; final ConnectionManager connectionManager;
@ -61,18 +62,26 @@ public class ConnectionPool<T extends RedisConnection> {
} }
public void add(final SubscribesConnectionEntry entry) { public void add(final SubscribesConnectionEntry entry) {
// is it a master connection pool? initConnections(entry, new Runnable() {
@Override
public void run() {
entries.add(entry);
handleQueue(entry, true);
}
}, true);
}
private void initConnections(final SubscribesConnectionEntry entry, final Runnable runnable, boolean checkFreezed) {
int minimumIdleSize = getMinimumIdleSize(entry); int minimumIdleSize = getMinimumIdleSize(entry);
if (minimumIdleSize == 0) { if (minimumIdleSize == 0) {
entries.add(entry); runnable.run();
handleQueue(entry);
return; return;
} }
final AtomicInteger completedConnections = new AtomicInteger(minimumIdleSize); final AtomicInteger completedConnections = new AtomicInteger(minimumIdleSize);
for (int i = 0; i < minimumIdleSize; i++) { for (int i = 0; i < minimumIdleSize; i++) {
if (entry.isFreezed() || !tryAcquireConnection(entry)) { if ((checkFreezed && entry.isFreezed()) || !tryAcquireConnection(entry)) {
continue; continue;
} }
@ -88,8 +97,7 @@ public class ConnectionPool<T extends RedisConnection> {
releaseConnection(entry); releaseConnection(entry);
if (completedConnections.decrementAndGet() == 0) { if (completedConnections.decrementAndGet() == 0) {
entries.add(entry); runnable.run();
handleQueue(entry);
} }
} }
}); });
@ -98,6 +106,7 @@ public class ConnectionPool<T extends RedisConnection> {
protected int getMinimumIdleSize(SubscribesConnectionEntry entry) { protected int getMinimumIdleSize(SubscribesConnectionEntry entry) {
int minimumIdleSize = config.getSlaveConnectionMinimumIdleSize(); int minimumIdleSize = config.getSlaveConnectionMinimumIdleSize();
// is it a master connection pool?
if (entry.getNodeType() == NodeType.MASTER && loadBalancer == null) { if (entry.getNodeType() == NodeType.MASTER && loadBalancer == null) {
minimumIdleSize = config.getMasterConnectionMinimumIdleSize(); minimumIdleSize = config.getMasterConnectionMinimumIdleSize();
} }
@ -129,7 +138,7 @@ public class ConnectionPool<T extends RedisConnection> {
} }
public Future<T> get(SubscribesConnectionEntry entry) { public Future<T> get(SubscribesConnectionEntry entry) {
if ((entry.getNodeType() == NodeType.MASTER || !entry.isFreezed()) if (((entry.getNodeType() == NodeType.MASTER && entry.getFreezeReason() == FreezeReason.SYSTEM) || !entry.isFreezed())
&& tryAcquireConnection(entry)) { && tryAcquireConnection(entry)) {
Promise<T> promise = connectionManager.newPromise(); Promise<T> promise = connectionManager.newPromise();
connect(entry, promise); connect(entry, promise);
@ -196,44 +205,59 @@ public class ConnectionPool<T extends RedisConnection> {
} }
private void promiseFailure(SubscribesConnectionEntry entry, Promise<T> promise, Throwable cause) { private void promiseFailure(SubscribesConnectionEntry entry, Promise<T> promise, Throwable cause) {
if (entry.incFailedAttempts() == config.getSlaveFailedAttempts() if (entry.incFailedAttempts() == config.getSlaveFailedAttempts()) {
&& entry.getNodeType() == NodeType.SLAVE) { if (entry.getNodeType() == NodeType.SLAVE) {
connectionManager.slaveDown(masterSlaveEntry, entry.getClient().getAddr().getHostName(), connectionManager.slaveDown(masterSlaveEntry, entry.getClient().getAddr().getHostName(),
entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT); entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT);
scheduleCheck(entry); scheduleCheck(entry);
} else {
freezeMaster(entry);
}
} }
promise.setFailure(cause); promises.addFirst(promise);
// promise.tryFailure(cause);
}
private void freezeMaster(SubscribesConnectionEntry entry) {
synchronized (entry) {
if (!entry.isFreezed()) {
entry.setFreezed(true);
if (entry.getFreezeReason() == null) {
entry.setFreezeReason(FreezeReason.RECONNECT);
}
scheduleCheck(entry);
}
}
} }
private void promiseFailure(SubscribesConnectionEntry entry, Promise<T> promise, T conn) { private void promiseFailure(SubscribesConnectionEntry entry, Promise<T> promise, T conn) {
int attempts = entry.incFailedAttempts(); int attempts = entry.incFailedAttempts();
if (entry.getNodeType() == NodeType.SLAVE) { if (attempts == config.getSlaveFailedAttempts()) {
if (attempts == config.getSlaveFailedAttempts()) { if (entry.getNodeType() == NodeType.SLAVE) {
connectionManager.slaveDown(masterSlaveEntry, entry.getClient().getAddr().getHostName(), connectionManager.slaveDown(masterSlaveEntry, entry.getClient().getAddr().getHostName(),
entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT); entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT);
scheduleCheck(entry); scheduleCheck(entry);
} else if (attempts < config.getSlaveFailedAttempts()) {
releaseConnection(entry, conn);
} else { } else {
conn.closeAsync(); freezeMaster(entry);
} }
} else { } else if (attempts < config.getSlaveFailedAttempts()) {
releaseConnection(entry, conn); releaseConnection(entry, conn);
} }
releaseConnection(entry); releaseConnection(entry);
RedisConnectionException cause = new RedisConnectionException(conn + " is not active!"); promises.addFirst(promise);
promise.setFailure(cause); // RedisConnectionException cause = new RedisConnectionException(conn + " is not active!");
// promise.tryFailure(cause);
} }
private void scheduleCheck(final SubscribesConnectionEntry entry) { private void scheduleCheck(final SubscribesConnectionEntry entry) {
connectionManager.getTimer().newTimeout(new TimerTask() { connectionManager.getTimer().newTimeout(new TimerTask() {
@Override @Override
public void run(Timeout timeout) throws Exception { public void run(Timeout timeout) throws Exception {
if (entry.getFreezeReason() == FreezeReason.MANAGER if (entry.getFreezeReason() != FreezeReason.RECONNECT
|| !entry.isFreezed()) { || !entry.isFreezed()) {
return; return;
} }
@ -242,7 +266,7 @@ public class ConnectionPool<T extends RedisConnection> {
connectionFuture.addListener(new FutureListener<RedisConnection>() { connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override @Override
public void operationComplete(Future<RedisConnection> future) throws Exception { public void operationComplete(Future<RedisConnection> future) throws Exception {
if (entry.getFreezeReason() == FreezeReason.MANAGER if (entry.getFreezeReason() != FreezeReason.RECONNECT
|| !entry.isFreezed()) { || !entry.isFreezed()) {
return; return;
} }
@ -258,14 +282,32 @@ public class ConnectionPool<T extends RedisConnection> {
@Override @Override
public void operationComplete(Future<String> future) throws Exception { public void operationComplete(Future<String> future) throws Exception {
try { try {
if (entry.getFreezeReason() == FreezeReason.MANAGER if (entry.getFreezeReason() != FreezeReason.RECONNECT
|| !entry.isFreezed()) { || !entry.isFreezed()) {
return; return;
} }
if (future.isSuccess() && "PONG".equals(future.getNow())) { if (future.isSuccess() && "PONG".equals(future.getNow())) {
entry.resetFailedAttempts(); entry.resetFailedAttempts();
masterSlaveEntry.slaveUp(entry.getClient().getAddr().getHostName(), entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT); initConnections(entry, new Runnable() {
@Override
public void run() {
if (entry.getNodeType() == NodeType.SLAVE) {
handleQueue(entry, false);
masterSlaveEntry.slaveUp(entry.getClient().getAddr().getHostName(), entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT);
} else {
synchronized (entry) {
if (entry.getFreezeReason() == FreezeReason.RECONNECT) {
handleQueue(entry, false);
entry.setFreezed(false);
entry.setFreezeReason(null);
}
}
}
}
}, false);
} else { } else {
scheduleCheck(entry); scheduleCheck(entry);
} }
@ -299,17 +341,28 @@ public class ConnectionPool<T extends RedisConnection> {
protected void releaseConnection(SubscribesConnectionEntry entry) { protected void releaseConnection(SubscribesConnectionEntry entry) {
entry.releaseConnection(); entry.releaseConnection();
handleQueue(entry); handleQueue(entry, true);
} }
private void handleQueue(SubscribesConnectionEntry entry) { private void handleQueue(SubscribesConnectionEntry entry, boolean checkFreezed) {
Promise<T> promise = promises.poll(); while (true) {
if (promise != null) { if (checkFreezed && entry.isFreezed()) {
if (!entry.isFreezed() && tryAcquireConnection(entry)) { return;
connect(entry, promise); }
Promise<T> promise = promises.poll();
if (promise == null) {
return;
}
if (promise.isCancelled()) {
continue;
}
if (!tryAcquireConnection(entry)) {
promises.addFirst(promise);
} else { } else {
promises.add(promise); connect(entry, promise);
} }
return;
} }
} }

Loading…
Cancel
Save