RedisPubSubConnection channel resubscribe. #248

pull/255/head
Nikita 10 years ago
parent 51542946d0
commit 387343e304

@ -15,6 +15,8 @@
*/
package org.redisson.client;
import java.util.Collections;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -30,10 +32,13 @@ import org.redisson.client.protocol.pubsub.PubSubPatternMessageDecoder;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import io.netty.channel.Channel;
import io.netty.util.internal.PlatformDependent;
public class RedisPubSubConnection extends RedisConnection {
final Queue<RedisPubSubListener<Object>> listeners = new ConcurrentLinkedQueue<RedisPubSubListener<Object>>();
final Map<String, Codec> channels = PlatformDependent.newConcurrentHashMap();
final Map<String, Codec> patternChannels = PlatformDependent.newConcurrentHashMap();
public RedisPubSubConnection(RedisClient redisClient, Channel channel) {
super(redisClient, channel);
@ -71,22 +76,42 @@ public class RedisPubSubConnection extends RedisConnection {
public void subscribe(Codec codec, String ... channel) {
async(new PubSubMessageDecoder(codec.getValueDecoder()), RedisCommands.SUBSCRIBE, channel);
for (String ch : channel) {
channels.put(ch, codec);
}
}
public void psubscribe(Codec codec, String ... channel) {
async(new PubSubPatternMessageDecoder(codec.getValueDecoder()), RedisCommands.PSUBSCRIBE, channel);
for (String ch : channel) {
patternChannels.put(ch, codec);
}
}
public void unsubscribe(String ... channel) {
async((MultiDecoder)null, RedisCommands.UNSUBSCRIBE, channel);
for (String ch : channel) {
channels.remove(ch);
}
}
public void punsubscribe(String ... channel) {
async((MultiDecoder)null, RedisCommands.PUNSUBSCRIBE, channel);
for (String ch : channel) {
patternChannels.remove(ch);
}
}
private <T, R> void async(MultiDecoder<Object> messageDecoder, RedisCommand<T> command, Object ... params) {
channel.writeAndFlush(new CommandData<T, R>(null, messageDecoder, null, command, params));
}
public Map<String, Codec> getChannels() {
return Collections.unmodifiableMap(channels);
}
public Map<String, Codec> getPatternChannels() {
return Collections.unmodifiableMap(patternChannels);
}
}

@ -15,14 +15,18 @@
*/
package org.redisson.client.handler;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.codec.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
@ -63,12 +67,12 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
group.schedule(new Runnable() {
@Override
public void run() {
doReConnect(group, connection, 1);
tryReconnect(group, connection, 1);
}
}, 100, TimeUnit.MILLISECONDS);
}
private void doReConnect(final EventLoopGroup group, final RedisConnection connection, final int attempts) {
private void tryReconnect(final EventLoopGroup group, final RedisConnection connection, final int attempts) {
if (connection.isClosed()) {
return;
}
@ -86,20 +90,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
try {
if (future.isSuccess()) {
log.debug("{} connected to {}", connection, connection.getRedisClient().getAddr());
if (connection.getReconnectListener() != null) {
bootstrap.group().execute(new Runnable() {
@Override
public void run() {
// new connection used only to init channel
RedisConnection rc = new RedisConnection(connection.getRedisClient(), future.channel());
connection.getReconnectListener().onReconnect(rc);
connection.updateChannel(future.channel());
}
});
} else {
connection.updateChannel(future.channel());
}
reconnect(connection, future.channel());
return;
}
} catch (RedisException e) {
@ -110,14 +101,48 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
group.schedule(new Runnable() {
@Override
public void run() {
doReConnect(group, connection, Math.min(BACKOFF_CAP, attempts + 1));
tryReconnect(group, connection, Math.min(BACKOFF_CAP, attempts + 1));
}
}, timeout, TimeUnit.MILLISECONDS);
}
});
}
private void reconnect(final RedisConnection connection, final Channel channel) {
if (connection.getReconnectListener() != null) {
bootstrap.group().execute(new Runnable() {
@Override
public void run() {
// new connection used only for channel init
RedisConnection rc = new RedisConnection(connection.getRedisClient(), channel);
connection.getReconnectListener().onReconnect(rc);
connection.updateChannel(channel);
resubscribe(connection);
}
});
} else {
connection.updateChannel(channel);
resubscribe(connection);
}
}
private void resubscribe(RedisConnection connection) {
if (connection instanceof RedisPubSubConnection) {
RedisPubSubConnection conn = (RedisPubSubConnection) connection;
for (Entry<String, Codec> entry : conn.getChannels().entrySet()) {
conn.subscribe(entry.getValue(), entry.getKey());
}
for (Entry<String, Codec> entry : conn.getPatternChannels().entrySet()) {
conn.psubscribe(entry.getValue(), entry.getKey());
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.channel().close();

Loading…
Cancel
Save