From 387343e304db7d33fb054756281c5dc4f248e8d7 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 16 Sep 2015 15:09:03 +0300 Subject: [PATCH] RedisPubSubConnection channel resubscribe. #248 --- .../client/RedisPubSubConnection.java | 25 ++++++++ .../client/handler/ConnectionWatchdog.java | 59 +++++++++++++------ 2 files changed, 67 insertions(+), 17 deletions(-) diff --git a/src/main/java/org/redisson/client/RedisPubSubConnection.java b/src/main/java/org/redisson/client/RedisPubSubConnection.java index ca4560acc..e12af0f62 100644 --- a/src/main/java/org/redisson/client/RedisPubSubConnection.java +++ b/src/main/java/org/redisson/client/RedisPubSubConnection.java @@ -15,6 +15,8 @@ */ package org.redisson.client; +import java.util.Collections; +import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; @@ -30,10 +32,13 @@ import org.redisson.client.protocol.pubsub.PubSubPatternMessageDecoder; import org.redisson.client.protocol.pubsub.PubSubStatusMessage; import io.netty.channel.Channel; +import io.netty.util.internal.PlatformDependent; public class RedisPubSubConnection extends RedisConnection { final Queue> listeners = new ConcurrentLinkedQueue>(); + final Map channels = PlatformDependent.newConcurrentHashMap(); + final Map patternChannels = PlatformDependent.newConcurrentHashMap(); public RedisPubSubConnection(RedisClient redisClient, Channel channel) { super(redisClient, channel); @@ -71,22 +76,42 @@ public class RedisPubSubConnection extends RedisConnection { public void subscribe(Codec codec, String ... channel) { async(new PubSubMessageDecoder(codec.getValueDecoder()), RedisCommands.SUBSCRIBE, channel); + for (String ch : channel) { + channels.put(ch, codec); + } } public void psubscribe(Codec codec, String ... channel) { async(new PubSubPatternMessageDecoder(codec.getValueDecoder()), RedisCommands.PSUBSCRIBE, channel); + for (String ch : channel) { + patternChannels.put(ch, codec); + } } public void unsubscribe(String ... channel) { async((MultiDecoder)null, RedisCommands.UNSUBSCRIBE, channel); + for (String ch : channel) { + channels.remove(ch); + } } public void punsubscribe(String ... channel) { async((MultiDecoder)null, RedisCommands.PUNSUBSCRIBE, channel); + for (String ch : channel) { + patternChannels.remove(ch); + } } private void async(MultiDecoder messageDecoder, RedisCommand command, Object ... params) { channel.writeAndFlush(new CommandData(null, messageDecoder, null, command, params)); } + public Map getChannels() { + return Collections.unmodifiableMap(channels); + } + + public Map getPatternChannels() { + return Collections.unmodifiableMap(patternChannels); + } + } diff --git a/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java b/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java index d55b7acbf..f02112b8d 100644 --- a/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java +++ b/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java @@ -15,14 +15,18 @@ */ package org.redisson.client.handler; +import java.util.Map.Entry; import java.util.concurrent.TimeUnit; import org.redisson.client.RedisConnection; import org.redisson.client.RedisException; +import org.redisson.client.RedisPubSubConnection; +import org.redisson.client.codec.Codec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; @@ -63,12 +67,12 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { group.schedule(new Runnable() { @Override public void run() { - doReConnect(group, connection, 1); + tryReconnect(group, connection, 1); } }, 100, TimeUnit.MILLISECONDS); } - private void doReConnect(final EventLoopGroup group, final RedisConnection connection, final int attempts) { + private void tryReconnect(final EventLoopGroup group, final RedisConnection connection, final int attempts) { if (connection.isClosed()) { return; } @@ -86,20 +90,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { try { if (future.isSuccess()) { log.debug("{} connected to {}", connection, connection.getRedisClient().getAddr()); - - if (connection.getReconnectListener() != null) { - bootstrap.group().execute(new Runnable() { - @Override - public void run() { - // new connection used only to init channel - RedisConnection rc = new RedisConnection(connection.getRedisClient(), future.channel()); - connection.getReconnectListener().onReconnect(rc); - connection.updateChannel(future.channel()); - } - }); - } else { - connection.updateChannel(future.channel()); - } + reconnect(connection, future.channel()); return; } } catch (RedisException e) { @@ -110,14 +101,48 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { group.schedule(new Runnable() { @Override public void run() { - doReConnect(group, connection, Math.min(BACKOFF_CAP, attempts + 1)); + tryReconnect(group, connection, Math.min(BACKOFF_CAP, attempts + 1)); } }, timeout, TimeUnit.MILLISECONDS); } + }); } + private void reconnect(final RedisConnection connection, final Channel channel) { + if (connection.getReconnectListener() != null) { + bootstrap.group().execute(new Runnable() { + @Override + public void run() { + // new connection used only for channel init + RedisConnection rc = new RedisConnection(connection.getRedisClient(), channel); + connection.getReconnectListener().onReconnect(rc); + connection.updateChannel(channel); + + resubscribe(connection); + } + + }); + } else { + connection.updateChannel(channel); + + resubscribe(connection); + } + } + + private void resubscribe(RedisConnection connection) { + if (connection instanceof RedisPubSubConnection) { + RedisPubSubConnection conn = (RedisPubSubConnection) connection; + for (Entry entry : conn.getChannels().entrySet()) { + conn.subscribe(entry.getValue(), entry.getKey()); + } + for (Entry entry : conn.getPatternChannels().entrySet()) { + conn.psubscribe(entry.getValue(), entry.getKey()); + } + } + } + @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.channel().close();