Long BlockingQueue methods re-subscription when slave is down

pull/456/head
Nikita 9 years ago
parent a60d3f491e
commit 901be4439e

@ -23,7 +23,6 @@ import org.redisson.client.RedisException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.QueueCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -151,26 +150,28 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
}
private void refresh(RedisConnection connection, Channel channel) {
CommandData commandData = connection.getCurrentCommand();
CommandData<?, ?> commandData = connection.getCurrentCommand();
connection.updateChannel(channel);
reattachBlockingQueue(connection, commandData);
reattachPubSub(connection);
}
private void reattachBlockingQueue(RedisConnection connection, final CommandData commandData) {
if (commandData != null
&& QueueCommand.TIMEOUTLESS_COMMANDS.contains(commandData.getCommand().getName())) {
ChannelFuture future = connection.send(commandData);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
log.error("Can't reconnect blocking queue to new connection {}", commandData);
}
}
});
private void reattachBlockingQueue(RedisConnection connection, final CommandData<?, ?> commandData) {
if (commandData == null
|| !commandData.isBlockingCommand()) {
return;
}
ChannelFuture future = connection.send(commandData);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
log.error("Can't reconnect blocking queue to new connection. {}", commandData);
}
}
});
}
}

@ -77,5 +77,9 @@ public class CommandData<T, R> implements QueueCommand {
}
return Collections.emptyList();
}
public boolean isBlockingCommand() {
return QueueCommand.TIMEOUTLESS_COMMANDS.contains(command.getName()) && !promise.isDone();
}
}

@ -158,19 +158,19 @@ public class ClientConnectionsEntry {
}
private <T extends RedisConnection> void addReconnectListener(Promise<T> connectionFuture, T conn) {
connectionManager.getConnectListener().onConnect(connectionFuture, conn, nodeType, connectionManager.getConfig());
addFireEventListener(connectionFuture);
addFireEventListener(conn, connectionFuture);
conn.setReconnectListener(new ReconnectListener() {
@Override
public void onReconnect(RedisConnection conn, Promise<RedisConnection> connectionFuture) {
connectionManager.getConnectListener().onConnect(connectionFuture, conn, nodeType, connectionManager.getConfig());
addFireEventListener(connectionFuture);
addFireEventListener(conn, connectionFuture);
}
});
}
private <T extends RedisConnection> void addFireEventListener(Promise<T> connectionFuture) {
private <T extends RedisConnection> void addFireEventListener(T conn, Promise<T> connectionFuture) {
connectionManager.getConnectListener().onConnect(connectionFuture, conn, nodeType, connectionManager.getConfig());
if (connectionFuture.isSuccess()) {
connectionManager.getConnectionEventsHub().fireConnect(connectionFuture.getNow().getRedisClient().getAddr());
return;
@ -196,10 +196,12 @@ public class ClientConnectionsEntry {
connectionFuture.tryFailure(future.cause());
return;
}
RedisPubSubConnection conn = future.getNow();
log.debug("new pubsub connection created: {}", conn);
addReconnectListener(connectionFuture, conn);
allSubscribeConnections.add(conn);
}

@ -23,7 +23,6 @@ import java.util.concurrent.TimeUnit;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
@ -44,8 +43,6 @@ import io.netty.util.concurrent.Promise;
*/
public interface ConnectionManager {
void reattachPubSub(Collection<RedisPubSubConnection> allPubSubConnections);
boolean isClusterMode();
<R> Future<R> newSucceededFuture(R value);

@ -542,78 +542,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return null;
}
@Override
public void reattachPubSub(Collection<RedisPubSubConnection> allPubSubConnections) {
for (Entry<String, PubSubConnectionEntry> mapEntry : name2PubSubConnection.entrySet()) {
for (RedisPubSubConnection redisPubSubConnection : allPubSubConnections) {
PubSubConnectionEntry pubSubEntry = mapEntry.getValue();
String channelName = mapEntry.getKey();
if (!pubSubEntry.getConnection().equals(redisPubSubConnection)) {
continue;
}
synchronized (pubSubEntry) {
pubSubEntry.close();
Collection<RedisPubSubListener> listeners = pubSubEntry.getListeners(channelName);
if (pubSubEntry.getConnection().getPatternChannels().get(channelName) != null) {
reattachPatternPubSubListeners(channelName, listeners);
} else {
reattachPubSubListeners(channelName, listeners);
}
}
}
}
}
private void reattachPubSubListeners(final String channelName, final Collection<RedisPubSubListener> listeners) {
Codec subscribeCodec = unsubscribe(channelName);
if (!listeners.isEmpty()) {
Future<PubSubConnectionEntry> future = subscribe(subscribeCodec, channelName, null);
future.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
public void operationComplete(Future<PubSubConnectionEntry> future)
throws Exception {
if (!future.isSuccess()) {
log.error("Can't resubscribe topic channel: " + channelName);
return;
}
PubSubConnectionEntry newEntry = future.getNow();
for (RedisPubSubListener redisPubSubListener : listeners) {
newEntry.addListener(channelName, redisPubSubListener);
}
log.debug("resubscribed listeners for '{}' channel", channelName);
}
});
}
}
private void reattachPatternPubSubListeners(final String channelName,
final Collection<RedisPubSubListener> listeners) {
Codec subscribeCodec = punsubscribe(channelName);
if (!listeners.isEmpty()) {
Future<PubSubConnectionEntry> future = psubscribe(channelName, subscribeCodec);
future.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
public void operationComplete(Future<PubSubConnectionEntry> future)
throws Exception {
if (!future.isSuccess()) {
log.error("Can't resubscribe topic channel: " + channelName);
return;
}
PubSubConnectionEntry newEntry = future.getNow();
for (RedisPubSubListener redisPubSubListener : listeners) {
newEntry.addListener(channelName, redisPubSubListener);
}
log.debug("resubscribed listeners for '{}' channel-pattern", channelName);
}
});
}
}
protected void slaveDown(ClusterSlotRange slotRange, String host, int port, FreezeReason freezeReason) {
getEntry(slotRange).slaveDown(host, port, freezeReason);
}

@ -28,6 +28,9 @@ import org.redisson.ReadMode;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.CommandData;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.balancer.LoadBalancerManager;
@ -37,7 +40,10 @@ import org.redisson.core.NodeType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
/**
*
@ -91,7 +97,8 @@ public class MasterSlaveEntry {
}
public boolean slaveDown(String host, int port, FreezeReason freezeReason) {
if (!slaveBalancer.freeze(host, port, freezeReason)) {
ClientConnectionsEntry entry = slaveBalancer.freeze(host, port, freezeReason);
if (entry == null) {
return false;
}
@ -102,8 +109,154 @@ public class MasterSlaveEntry {
log.info("master {}:{} used as slave", addr.getHostName(), addr.getPort());
}
}
// close all connections
while (true) {
final RedisConnection connection = entry.pollConnection();
if (connection == null) {
break;
}
connection.closeAsync().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
reattachBlockingQueue(connection);
}
});
}
// close all pub/sub connections
while (true) {
RedisPubSubConnection connection = entry.pollSubscribeConnection();
if (connection == null) {
break;
}
connection.closeAsync();
}
for (RedisPubSubConnection connection : entry.getAllSubscribeConnections()) {
reattachPubSub(connection);
}
entry.getAllSubscribeConnections().clear();
return true;
}
private void reattachPubSub(RedisPubSubConnection redisPubSubConnection) {
for (String channelName : redisPubSubConnection.getChannels().keySet()) {
PubSubConnectionEntry pubSubEntry = connectionManager.getPubSubEntry(channelName);
synchronized (pubSubEntry) {
pubSubEntry.close();
Collection<RedisPubSubListener> listeners = pubSubEntry.getListeners(channelName);
reattachPubSubListeners(channelName, listeners);
}
}
for (String channelName : redisPubSubConnection.getPatternChannels().keySet()) {
PubSubConnectionEntry pubSubEntry = connectionManager.getPubSubEntry(channelName);
synchronized (pubSubEntry) {
pubSubEntry.close();
Collection<RedisPubSubListener> listeners = pubSubEntry.getListeners(channelName);
reattachPatternPubSubListeners(channelName, listeners);
}
}
}
private void reattachPubSubListeners(final String channelName, final Collection<RedisPubSubListener> listeners) {
Codec subscribeCodec = connectionManager.unsubscribe(channelName);
if (!listeners.isEmpty()) {
Future<PubSubConnectionEntry> future = connectionManager.subscribe(subscribeCodec, channelName, null);
future.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
public void operationComplete(Future<PubSubConnectionEntry> future)
throws Exception {
if (!future.isSuccess()) {
log.error("Can't resubscribe topic channel: " + channelName);
return;
}
PubSubConnectionEntry newEntry = future.getNow();
for (RedisPubSubListener redisPubSubListener : listeners) {
newEntry.addListener(channelName, redisPubSubListener);
}
log.debug("resubscribed listeners for '{}' channel", channelName);
}
});
}
}
private void reattachPatternPubSubListeners(final String channelName,
final Collection<RedisPubSubListener> listeners) {
Codec subscribeCodec = connectionManager.punsubscribe(channelName);
if (!listeners.isEmpty()) {
Future<PubSubConnectionEntry> future = connectionManager.psubscribe(channelName, subscribeCodec);
future.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
public void operationComplete(Future<PubSubConnectionEntry> future)
throws Exception {
if (!future.isSuccess()) {
log.error("Can't resubscribe topic channel: " + channelName);
return;
}
PubSubConnectionEntry newEntry = future.getNow();
for (RedisPubSubListener redisPubSubListener : listeners) {
newEntry.addListener(channelName, redisPubSubListener);
}
log.debug("resubscribed listeners for '{}' channel-pattern", channelName);
}
});
}
}
private void reattachBlockingQueue(RedisConnection connection) {
final CommandData<?, ?> commandData = connection.getCurrentCommand();
if (commandData == null
|| !commandData.isBlockingCommand()) {
return;
}
Future<RedisConnection> newConnection = connectionReadOp();
newConnection.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> future) throws Exception {
if (!future.isSuccess()) {
log.error("Can't resubscribe blocking queue {}", commandData);
return;
}
final RedisConnection newConnection = future.getNow();
final FutureListener<Object> listener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
releaseRead(newConnection);
}
};
commandData.getPromise().addListener(listener);
if (commandData.getPromise().isDone()) {
return;
}
ChannelFuture channelFuture = newConnection.send(commandData);
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
listener.operationComplete(null);
commandData.getPromise().removeListener(listener);
releaseRead(newConnection);
log.error("Can't resubscribe blocking queue {}", commandData);
}
}
});
}
});
}
public Future<Void> addSlave(String host, int port) {
return addSlave(host, port, true, NodeType.SLAVE);

@ -36,7 +36,7 @@ public interface LoadBalancerManager {
boolean unfreeze(String host, int port, FreezeReason freezeReason);
boolean freeze(String host, int port, FreezeReason freezeReason);
ClientConnectionsEntry freeze(String host, int port, FreezeReason freezeReason);
Future<Void> add(ClientConnectionsEntry entry);

@ -95,16 +95,16 @@ public class LoadBalancerManagerImpl implements LoadBalancerManager {
return false;
}
public boolean freeze(String host, int port, FreezeReason freezeReason) {
public ClientConnectionsEntry freeze(String host, int port, FreezeReason freezeReason) {
InetSocketAddress addr = new InetSocketAddress(host, port);
ClientConnectionsEntry connectionEntry = addr2Entry.get(addr);
if (connectionEntry == null) {
return false;
return null;
}
synchronized (connectionEntry) {
if (connectionEntry.isFreezed()) {
return false;
return null;
}
connectionEntry.setFreezed(true);
@ -116,27 +116,7 @@ public class LoadBalancerManagerImpl implements LoadBalancerManager {
}
}
// close all connections
while (true) {
RedisConnection connection = connectionEntry.pollConnection();
if (connection == null) {
break;
}
connection.closeAsync();
}
// close all pub/sub connections
while (true) {
RedisPubSubConnection connection = connectionEntry.pollSubscribeConnection();
if (connection == null) {
break;
}
connection.closeAsync();
}
connectionManager.reattachPubSub(connectionEntry.getAllSubscribeConnections());
connectionEntry.getAllSubscribeConnections().clear();
return true;
return connectionEntry;
}
public Future<RedisPubSubConnection> nextPubSubConnection() {

Loading…
Cancel
Save