diff --git a/src/main/java/org/redisson/client/RedisPubSubConnection.java b/src/main/java/org/redisson/client/RedisPubSubConnection.java index ca4560acc..e12af0f62 100644 --- a/src/main/java/org/redisson/client/RedisPubSubConnection.java +++ b/src/main/java/org/redisson/client/RedisPubSubConnection.java @@ -15,6 +15,8 @@ */ package org.redisson.client; +import java.util.Collections; +import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; @@ -30,10 +32,13 @@ import org.redisson.client.protocol.pubsub.PubSubPatternMessageDecoder; import org.redisson.client.protocol.pubsub.PubSubStatusMessage; import io.netty.channel.Channel; +import io.netty.util.internal.PlatformDependent; public class RedisPubSubConnection extends RedisConnection { final Queue> listeners = new ConcurrentLinkedQueue>(); + final Map channels = PlatformDependent.newConcurrentHashMap(); + final Map patternChannels = PlatformDependent.newConcurrentHashMap(); public RedisPubSubConnection(RedisClient redisClient, Channel channel) { super(redisClient, channel); @@ -71,22 +76,42 @@ public class RedisPubSubConnection extends RedisConnection { public void subscribe(Codec codec, String ... channel) { async(new PubSubMessageDecoder(codec.getValueDecoder()), RedisCommands.SUBSCRIBE, channel); + for (String ch : channel) { + channels.put(ch, codec); + } } public void psubscribe(Codec codec, String ... channel) { async(new PubSubPatternMessageDecoder(codec.getValueDecoder()), RedisCommands.PSUBSCRIBE, channel); + for (String ch : channel) { + patternChannels.put(ch, codec); + } } public void unsubscribe(String ... channel) { async((MultiDecoder)null, RedisCommands.UNSUBSCRIBE, channel); + for (String ch : channel) { + channels.remove(ch); + } } public void punsubscribe(String ... channel) { async((MultiDecoder)null, RedisCommands.PUNSUBSCRIBE, channel); + for (String ch : channel) { + patternChannels.remove(ch); + } } private void async(MultiDecoder messageDecoder, RedisCommand command, Object ... params) { channel.writeAndFlush(new CommandData(null, messageDecoder, null, command, params)); } + public Map getChannels() { + return Collections.unmodifiableMap(channels); + } + + public Map getPatternChannels() { + return Collections.unmodifiableMap(patternChannels); + } + } diff --git a/src/main/java/org/redisson/client/codec/LongCodec.java b/src/main/java/org/redisson/client/codec/LongCodec.java index 8d352602a..53bf14604 100644 --- a/src/main/java/org/redisson/client/codec/LongCodec.java +++ b/src/main/java/org/redisson/client/codec/LongCodec.java @@ -25,14 +25,16 @@ public class LongCodec extends StringCodec { public static final LongCodec INSTANCE = new LongCodec(); + public final Decoder decoder = new Decoder() { + @Override + public Object decode(ByteBuf buf, State state) { + return Long.valueOf(buf.toString(CharsetUtil.UTF_8)); + } + }; + @Override public Decoder getValueDecoder() { - return new Decoder() { - @Override - public Object decode(ByteBuf buf, State state) { - return Long.valueOf(buf.toString(CharsetUtil.UTF_8)); - } - }; + return decoder; } } diff --git a/src/main/java/org/redisson/client/codec/StringCodec.java b/src/main/java/org/redisson/client/codec/StringCodec.java index 53b07a6a4..8a5e2f19c 100644 --- a/src/main/java/org/redisson/client/codec/StringCodec.java +++ b/src/main/java/org/redisson/client/codec/StringCodec.java @@ -28,24 +28,28 @@ public class StringCodec implements Codec { public static final StringCodec INSTANCE = new StringCodec(); + private final Encoder encoder = new Encoder() { + @Override + public byte[] encode(Object in) throws IOException { + return in.toString().getBytes("UTF-8"); + } + }; + + private final Decoder decoder = new Decoder() { + @Override + public Object decode(ByteBuf buf, State state) { + return buf.toString(CharsetUtil.UTF_8); + } + }; + @Override public Decoder getValueDecoder() { - return new Decoder() { - @Override - public Object decode(ByteBuf buf, State state) { - return buf.toString(CharsetUtil.UTF_8); - } - }; + return decoder; } @Override public Encoder getValueEncoder() { - return new Encoder() { - @Override - public byte[] encode(Object in) throws IOException { - return in.toString().getBytes("UTF-8"); - } - }; + return encoder; } @Override diff --git a/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java b/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java index d55b7acbf..f02112b8d 100644 --- a/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java +++ b/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java @@ -15,14 +15,18 @@ */ package org.redisson.client.handler; +import java.util.Map.Entry; import java.util.concurrent.TimeUnit; import org.redisson.client.RedisConnection; import org.redisson.client.RedisException; +import org.redisson.client.RedisPubSubConnection; +import org.redisson.client.codec.Codec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; @@ -63,12 +67,12 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { group.schedule(new Runnable() { @Override public void run() { - doReConnect(group, connection, 1); + tryReconnect(group, connection, 1); } }, 100, TimeUnit.MILLISECONDS); } - private void doReConnect(final EventLoopGroup group, final RedisConnection connection, final int attempts) { + private void tryReconnect(final EventLoopGroup group, final RedisConnection connection, final int attempts) { if (connection.isClosed()) { return; } @@ -86,20 +90,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { try { if (future.isSuccess()) { log.debug("{} connected to {}", connection, connection.getRedisClient().getAddr()); - - if (connection.getReconnectListener() != null) { - bootstrap.group().execute(new Runnable() { - @Override - public void run() { - // new connection used only to init channel - RedisConnection rc = new RedisConnection(connection.getRedisClient(), future.channel()); - connection.getReconnectListener().onReconnect(rc); - connection.updateChannel(future.channel()); - } - }); - } else { - connection.updateChannel(future.channel()); - } + reconnect(connection, future.channel()); return; } } catch (RedisException e) { @@ -110,14 +101,48 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { group.schedule(new Runnable() { @Override public void run() { - doReConnect(group, connection, Math.min(BACKOFF_CAP, attempts + 1)); + tryReconnect(group, connection, Math.min(BACKOFF_CAP, attempts + 1)); } }, timeout, TimeUnit.MILLISECONDS); } + }); } + private void reconnect(final RedisConnection connection, final Channel channel) { + if (connection.getReconnectListener() != null) { + bootstrap.group().execute(new Runnable() { + @Override + public void run() { + // new connection used only for channel init + RedisConnection rc = new RedisConnection(connection.getRedisClient(), channel); + connection.getReconnectListener().onReconnect(rc); + connection.updateChannel(channel); + + resubscribe(connection); + } + + }); + } else { + connection.updateChannel(channel); + + resubscribe(connection); + } + } + + private void resubscribe(RedisConnection connection) { + if (connection instanceof RedisPubSubConnection) { + RedisPubSubConnection conn = (RedisPubSubConnection) connection; + for (Entry entry : conn.getChannels().entrySet()) { + conn.subscribe(entry.getValue(), entry.getKey()); + } + for (Entry entry : conn.getPatternChannels().entrySet()) { + conn.psubscribe(entry.getValue(), entry.getKey()); + } + } + } + @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.channel().close(); diff --git a/src/main/java/org/redisson/codec/JsonJacksonCodec.java b/src/main/java/org/redisson/codec/JsonJacksonCodec.java index 99d0fb720..564a97881 100755 --- a/src/main/java/org/redisson/codec/JsonJacksonCodec.java +++ b/src/main/java/org/redisson/codec/JsonJacksonCodec.java @@ -44,7 +44,21 @@ import io.netty.buffer.ByteBufInputStream; */ public class JsonJacksonCodec implements Codec { - private ObjectMapper mapObjectMapper = new ObjectMapper(); + private final ObjectMapper mapObjectMapper = new ObjectMapper(); + + private final Encoder encoder = new Encoder() { + @Override + public byte[] encode(Object in) throws IOException { + return mapObjectMapper.writeValueAsBytes(in); + } + }; + + private final Decoder decoder = new Decoder() { + @Override + public Object decode(ByteBuf buf, State state) throws IOException { + return mapObjectMapper.readValue(new ByteBufInputStream(buf), Object.class); + } + }; public JsonJacksonCodec() { init(mapObjectMapper); @@ -94,24 +108,12 @@ public class JsonJacksonCodec implements Codec { @Override public Decoder getMapValueDecoder() { - return new Decoder() { - - @Override - public Object decode(ByteBuf buf, State state) throws IOException { - return mapObjectMapper.readValue(new ByteBufInputStream(buf), Object.class); - } - }; + return decoder; } @Override public Encoder getMapValueEncoder() { - return new Encoder() { - - @Override - public byte[] encode(Object in) throws IOException { - return mapObjectMapper.writeValueAsBytes(in); - } - }; + return encoder; } @Override diff --git a/src/main/java/org/redisson/codec/KryoCodec.java b/src/main/java/org/redisson/codec/KryoCodec.java index 3518f8fbe..2da3febad 100755 --- a/src/main/java/org/redisson/codec/KryoCodec.java +++ b/src/main/java/org/redisson/codec/KryoCodec.java @@ -93,6 +93,51 @@ public class KryoCodec implements Codec { private final KryoPool kryoPool; + private final Decoder decoder = new Decoder() { + @Override + public Object decode(ByteBuf buf, State state) throws IOException { + Kryo kryo = null; + try { + kryo = kryoPool.get(); + return kryo.readClassAndObject(new Input(new ByteBufInputStream(buf))); + } catch (Exception e) { + if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } + throw new RedissonKryoCodecException(e); + } finally { + if (kryo != null) { + kryoPool.yield(kryo); + } + } + } + }; + + private final Encoder encoder = new Encoder() { + + @Override + public byte[] encode(Object in) throws IOException { + Kryo kryo = null; + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + Output output = new Output(baos); + kryo = kryoPool.get(); + kryo.writeClassAndObject(output, in); + output.close(); + return baos.toByteArray(); + } catch (Exception e) { + if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } + throw new RedissonKryoCodecException(e); + } finally { + if (kryo != null) { + kryoPool.yield(kryo); + } + } + } + }; + public KryoCodec() { this(new KryoPoolImpl(Collections.>emptyList())); } @@ -128,54 +173,12 @@ public class KryoCodec implements Codec { @Override public Decoder getValueDecoder() { - return new Decoder() { - - @Override - public Object decode(ByteBuf buf, State state) throws IOException { - Kryo kryo = null; - try { - kryo = kryoPool.get(); - return kryo.readClassAndObject(new Input(new ByteBufInputStream(buf))); - } catch (Exception e) { - if (e instanceof RuntimeException) { - throw (RuntimeException) e; - } - throw new RedissonKryoCodecException(e); - } finally { - if (kryo != null) { - kryoPool.yield(kryo); - } - } - } - }; + return decoder; } @Override public Encoder getValueEncoder() { - return new Encoder() { - - @Override - public byte[] encode(Object in) throws IOException { - Kryo kryo = null; - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - Output output = new Output(baos); - kryo = kryoPool.get(); - kryo.writeClassAndObject(output, in); - output.close(); - return baos.toByteArray(); - } catch (Exception e) { - if (e instanceof RuntimeException) { - throw (RuntimeException) e; - } - throw new RedissonKryoCodecException(e); - } finally { - if (kryo != null) { - kryoPool.yield(kryo); - } - } - } - }; + return encoder; } } diff --git a/src/main/java/org/redisson/codec/SerializationCodec.java b/src/main/java/org/redisson/codec/SerializationCodec.java index 9234af398..b698bd130 100644 --- a/src/main/java/org/redisson/codec/SerializationCodec.java +++ b/src/main/java/org/redisson/codec/SerializationCodec.java @@ -35,6 +35,32 @@ import io.netty.buffer.ByteBufInputStream; */ public class SerializationCodec implements Codec { + private final Decoder decoder = new Decoder() { + @Override + public Object decode(ByteBuf buf, State state) throws IOException { + try { + ObjectInputStream inputStream = new ObjectInputStream(new ByteBufInputStream(buf)); + return inputStream.readObject(); + } catch (IOException e) { + throw e; + } catch (Exception e) { + throw new IOException(e); + } + } + }; + + private final Encoder encoder = new Encoder() { + + @Override + public byte[] encode(Object in) throws IOException { + ByteArrayOutputStream result = new ByteArrayOutputStream(); + ObjectOutputStream outputStream = new ObjectOutputStream(result); + outputStream.writeObject(in); + outputStream.close(); + return result.toByteArray(); + } + }; + @Override public Decoder getMapValueDecoder() { return getValueDecoder(); @@ -57,34 +83,12 @@ public class SerializationCodec implements Codec { @Override public Decoder getValueDecoder() { - return new Decoder() { - @Override - public Object decode(ByteBuf buf, State state) throws IOException { - try { - ObjectInputStream inputStream = new ObjectInputStream(new ByteBufInputStream(buf)); - return inputStream.readObject(); - } catch (IOException e) { - throw e; - } catch (Exception e) { - throw new IOException(e); - } - } - }; + return decoder; } @Override public Encoder getValueEncoder() { - return new Encoder() { - - @Override - public byte[] encode(Object in) throws IOException { - ByteArrayOutputStream result = new ByteArrayOutputStream(); - ObjectOutputStream outputStream = new ObjectOutputStream(result); - outputStream.writeObject(in); - outputStream.close(); - return result.toByteArray(); - } - }; + return encoder; } } diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index eb6676f49..8078b8262 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -374,7 +374,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return null; } - return entry.unsubscribe(channelName, new BaseRedisPubSubListener() { + Codec entryCodec = entry.getConnection().getChannels().get(channelName); + entry.unsubscribe(channelName, new BaseRedisPubSubListener() { @Override public boolean onStatus(PubSubType type, String channel) { @@ -390,6 +391,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } }); + return entryCodec; } @Override @@ -399,7 +401,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return null; } - return entry.punsubscribe(channelName, new BaseRedisPubSubListener() { + Codec entryCodec = entry.getConnection().getPatternChannels().get(channelName); + entry.punsubscribe(channelName, new BaseRedisPubSubListener() { @Override public boolean onStatus(PubSubType type, String channel) { @@ -415,6 +418,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } }); + return entryCodec; } protected MasterSlaveEntry getEntry(int slot) { @@ -438,13 +442,24 @@ public class MasterSlaveConnectionManager implements ConnectionManager { entry.close(); Collection listeners = entry.getListeners(channelName); - Codec subscribeCodec = unsubscribe(channelName); - if (!listeners.isEmpty()) { - PubSubConnectionEntry newEntry = subscribe(channelName, subscribeCodec); - for (RedisPubSubListener redisPubSubListener : listeners) { - newEntry.addListener(channelName, redisPubSubListener); + if (entry.getConnection().getPatternChannels().get(channelName) != null) { + Codec subscribeCodec = punsubscribe(channelName); + if (!listeners.isEmpty()) { + PubSubConnectionEntry newEntry = psubscribe(channelName, subscribeCodec); + for (RedisPubSubListener redisPubSubListener : listeners) { + newEntry.addListener(channelName, redisPubSubListener); + } + log.debug("resubscribed listeners for '{}' channel-pattern", channelName); + } + } else { + Codec subscribeCodec = unsubscribe(channelName); + if (!listeners.isEmpty()) { + PubSubConnectionEntry newEntry = subscribe(channelName, subscribeCodec); + for (RedisPubSubListener redisPubSubListener : listeners) { + newEntry.addListener(channelName, redisPubSubListener); + } + log.debug("resubscribed listeners for '{}' channel", channelName); } - log.debug("resubscribed listeners for '{}' channel", channelName); } } } diff --git a/src/main/java/org/redisson/connection/PubSubConnectionEntry.java b/src/main/java/org/redisson/connection/PubSubConnectionEntry.java index 14f84e933..3c928ba8a 100644 --- a/src/main/java/org/redisson/connection/PubSubConnectionEntry.java +++ b/src/main/java/org/redisson/connection/PubSubConnectionEntry.java @@ -18,7 +18,6 @@ package org.redisson.connection; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -33,8 +32,6 @@ import org.redisson.client.protocol.pubsub.PubSubType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.util.internal.PlatformDependent; - public class PubSubConnectionEntry { public enum Status {ACTIVE, INACTIVE} @@ -46,7 +43,6 @@ public class PubSubConnectionEntry { private final RedisPubSubConnection conn; private final int subscriptionsPerConnection; - private final Map channel2Codec = PlatformDependent.newConcurrentHashMap(); private final ConcurrentMap> channelListeners = new ConcurrentHashMap>(); public PubSubConnectionEntry(RedisPubSubConnection conn, int subscriptionsPerConnection) { @@ -127,12 +123,10 @@ public class PubSubConnectionEntry { } public void subscribe(Codec codec, String channelName) { - channel2Codec.put(channelName, codec); conn.subscribe(codec, channelName); } public void psubscribe(Codec codec, String pattern) { - channel2Codec.put(pattern, codec); conn.psubscribe(codec, pattern); } @@ -141,7 +135,7 @@ public class PubSubConnectionEntry { conn.subscribe(codec, channel); } - public Codec unsubscribe(final String channel, RedisPubSubListener listener) { + public void unsubscribe(final String channel, RedisPubSubListener listener) { conn.addOneShotListener(new BaseRedisPubSubListener() { @Override public boolean onStatus(PubSubType type, String ch) { @@ -155,7 +149,6 @@ public class PubSubConnectionEntry { }); conn.addOneShotListener(listener); conn.unsubscribe(channel); - return channel2Codec.remove(channel); } private void removeListeners(String channel) { @@ -171,7 +164,7 @@ public class PubSubConnectionEntry { subscribedChannelsAmount.release(); } - public Codec punsubscribe(final String channel, RedisPubSubListener listener) { + public void punsubscribe(final String channel, RedisPubSubListener listener) { conn.addOneShotListener(new BaseRedisPubSubListener() { @Override public boolean onStatus(PubSubType type, String ch) { @@ -184,7 +177,6 @@ public class PubSubConnectionEntry { }); conn.addOneShotListener(listener); conn.punsubscribe(channel); - return channel2Codec.remove(channel); }