Merge branch 'mrniko/master'

pull/282/head
Rui Gu 10 years ago
commit 61ecf7f1f3

@ -15,6 +15,8 @@
*/ */
package org.redisson.client; package org.redisson.client;
import java.util.Collections;
import java.util.Map;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
@ -30,10 +32,13 @@ import org.redisson.client.protocol.pubsub.PubSubPatternMessageDecoder;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage; import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.util.internal.PlatformDependent;
public class RedisPubSubConnection extends RedisConnection { public class RedisPubSubConnection extends RedisConnection {
final Queue<RedisPubSubListener<Object>> listeners = new ConcurrentLinkedQueue<RedisPubSubListener<Object>>(); final Queue<RedisPubSubListener<Object>> listeners = new ConcurrentLinkedQueue<RedisPubSubListener<Object>>();
final Map<String, Codec> channels = PlatformDependent.newConcurrentHashMap();
final Map<String, Codec> patternChannels = PlatformDependent.newConcurrentHashMap();
public RedisPubSubConnection(RedisClient redisClient, Channel channel) { public RedisPubSubConnection(RedisClient redisClient, Channel channel) {
super(redisClient, channel); super(redisClient, channel);
@ -71,22 +76,42 @@ public class RedisPubSubConnection extends RedisConnection {
public void subscribe(Codec codec, String ... channel) { public void subscribe(Codec codec, String ... channel) {
async(new PubSubMessageDecoder(codec.getValueDecoder()), RedisCommands.SUBSCRIBE, channel); async(new PubSubMessageDecoder(codec.getValueDecoder()), RedisCommands.SUBSCRIBE, channel);
for (String ch : channel) {
channels.put(ch, codec);
}
} }
public void psubscribe(Codec codec, String ... channel) { public void psubscribe(Codec codec, String ... channel) {
async(new PubSubPatternMessageDecoder(codec.getValueDecoder()), RedisCommands.PSUBSCRIBE, channel); async(new PubSubPatternMessageDecoder(codec.getValueDecoder()), RedisCommands.PSUBSCRIBE, channel);
for (String ch : channel) {
patternChannels.put(ch, codec);
}
} }
public void unsubscribe(String ... channel) { public void unsubscribe(String ... channel) {
async((MultiDecoder)null, RedisCommands.UNSUBSCRIBE, channel); async((MultiDecoder)null, RedisCommands.UNSUBSCRIBE, channel);
for (String ch : channel) {
channels.remove(ch);
}
} }
public void punsubscribe(String ... channel) { public void punsubscribe(String ... channel) {
async((MultiDecoder)null, RedisCommands.PUNSUBSCRIBE, channel); async((MultiDecoder)null, RedisCommands.PUNSUBSCRIBE, channel);
for (String ch : channel) {
patternChannels.remove(ch);
}
} }
private <T, R> void async(MultiDecoder<Object> messageDecoder, RedisCommand<T> command, Object ... params) { private <T, R> void async(MultiDecoder<Object> messageDecoder, RedisCommand<T> command, Object ... params) {
channel.writeAndFlush(new CommandData<T, R>(null, messageDecoder, null, command, params)); channel.writeAndFlush(new CommandData<T, R>(null, messageDecoder, null, command, params));
} }
public Map<String, Codec> getChannels() {
return Collections.unmodifiableMap(channels);
}
public Map<String, Codec> getPatternChannels() {
return Collections.unmodifiableMap(patternChannels);
}
} }

@ -25,14 +25,16 @@ public class LongCodec extends StringCodec {
public static final LongCodec INSTANCE = new LongCodec(); public static final LongCodec INSTANCE = new LongCodec();
public final Decoder<Object> decoder = new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) {
return Long.valueOf(buf.toString(CharsetUtil.UTF_8));
}
};
@Override @Override
public Decoder<Object> getValueDecoder() { public Decoder<Object> getValueDecoder() {
return new Decoder<Object>() { return decoder;
@Override
public Object decode(ByteBuf buf, State state) {
return Long.valueOf(buf.toString(CharsetUtil.UTF_8));
}
};
} }
} }

@ -28,24 +28,28 @@ public class StringCodec implements Codec {
public static final StringCodec INSTANCE = new StringCodec(); public static final StringCodec INSTANCE = new StringCodec();
private final Encoder encoder = new Encoder() {
@Override
public byte[] encode(Object in) throws IOException {
return in.toString().getBytes("UTF-8");
}
};
private final Decoder<Object> decoder = new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) {
return buf.toString(CharsetUtil.UTF_8);
}
};
@Override @Override
public Decoder<Object> getValueDecoder() { public Decoder<Object> getValueDecoder() {
return new Decoder<Object>() { return decoder;
@Override
public Object decode(ByteBuf buf, State state) {
return buf.toString(CharsetUtil.UTF_8);
}
};
} }
@Override @Override
public Encoder getValueEncoder() { public Encoder getValueEncoder() {
return new Encoder() { return encoder;
@Override
public byte[] encode(Object in) throws IOException {
return in.toString().getBytes("UTF-8");
}
};
} }
@Override @Override

@ -15,14 +15,18 @@
*/ */
package org.redisson.client.handler; package org.redisson.client.handler;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException; import org.redisson.client.RedisException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.codec.Codec;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
@ -63,12 +67,12 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
group.schedule(new Runnable() { group.schedule(new Runnable() {
@Override @Override
public void run() { public void run() {
doReConnect(group, connection, 1); tryReconnect(group, connection, 1);
} }
}, 100, TimeUnit.MILLISECONDS); }, 100, TimeUnit.MILLISECONDS);
} }
private void doReConnect(final EventLoopGroup group, final RedisConnection connection, final int attempts) { private void tryReconnect(final EventLoopGroup group, final RedisConnection connection, final int attempts) {
if (connection.isClosed()) { if (connection.isClosed()) {
return; return;
} }
@ -86,20 +90,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
try { try {
if (future.isSuccess()) { if (future.isSuccess()) {
log.debug("{} connected to {}", connection, connection.getRedisClient().getAddr()); log.debug("{} connected to {}", connection, connection.getRedisClient().getAddr());
reconnect(connection, future.channel());
if (connection.getReconnectListener() != null) {
bootstrap.group().execute(new Runnable() {
@Override
public void run() {
// new connection used only to init channel
RedisConnection rc = new RedisConnection(connection.getRedisClient(), future.channel());
connection.getReconnectListener().onReconnect(rc);
connection.updateChannel(future.channel());
}
});
} else {
connection.updateChannel(future.channel());
}
return; return;
} }
} catch (RedisException e) { } catch (RedisException e) {
@ -110,14 +101,48 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
group.schedule(new Runnable() { group.schedule(new Runnable() {
@Override @Override
public void run() { public void run() {
doReConnect(group, connection, Math.min(BACKOFF_CAP, attempts + 1)); tryReconnect(group, connection, Math.min(BACKOFF_CAP, attempts + 1));
} }
}, timeout, TimeUnit.MILLISECONDS); }, timeout, TimeUnit.MILLISECONDS);
} }
}); });
} }
private void reconnect(final RedisConnection connection, final Channel channel) {
if (connection.getReconnectListener() != null) {
bootstrap.group().execute(new Runnable() {
@Override
public void run() {
// new connection used only for channel init
RedisConnection rc = new RedisConnection(connection.getRedisClient(), channel);
connection.getReconnectListener().onReconnect(rc);
connection.updateChannel(channel);
resubscribe(connection);
}
});
} else {
connection.updateChannel(channel);
resubscribe(connection);
}
}
private void resubscribe(RedisConnection connection) {
if (connection instanceof RedisPubSubConnection) {
RedisPubSubConnection conn = (RedisPubSubConnection) connection;
for (Entry<String, Codec> entry : conn.getChannels().entrySet()) {
conn.subscribe(entry.getValue(), entry.getKey());
}
for (Entry<String, Codec> entry : conn.getPatternChannels().entrySet()) {
conn.psubscribe(entry.getValue(), entry.getKey());
}
}
}
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.channel().close(); ctx.channel().close();

@ -44,7 +44,21 @@ import io.netty.buffer.ByteBufInputStream;
*/ */
public class JsonJacksonCodec implements Codec { public class JsonJacksonCodec implements Codec {
private ObjectMapper mapObjectMapper = new ObjectMapper(); private final ObjectMapper mapObjectMapper = new ObjectMapper();
private final Encoder encoder = new Encoder() {
@Override
public byte[] encode(Object in) throws IOException {
return mapObjectMapper.writeValueAsBytes(in);
}
};
private final Decoder<Object> decoder = new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
return mapObjectMapper.readValue(new ByteBufInputStream(buf), Object.class);
}
};
public JsonJacksonCodec() { public JsonJacksonCodec() {
init(mapObjectMapper); init(mapObjectMapper);
@ -94,24 +108,12 @@ public class JsonJacksonCodec implements Codec {
@Override @Override
public Decoder<Object> getMapValueDecoder() { public Decoder<Object> getMapValueDecoder() {
return new Decoder<Object>() { return decoder;
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
return mapObjectMapper.readValue(new ByteBufInputStream(buf), Object.class);
}
};
} }
@Override @Override
public Encoder getMapValueEncoder() { public Encoder getMapValueEncoder() {
return new Encoder() { return encoder;
@Override
public byte[] encode(Object in) throws IOException {
return mapObjectMapper.writeValueAsBytes(in);
}
};
} }
@Override @Override

@ -93,6 +93,51 @@ public class KryoCodec implements Codec {
private final KryoPool kryoPool; private final KryoPool kryoPool;
private final Decoder<Object> decoder = new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
Kryo kryo = null;
try {
kryo = kryoPool.get();
return kryo.readClassAndObject(new Input(new ByteBufInputStream(buf)));
} catch (Exception e) {
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
}
throw new RedissonKryoCodecException(e);
} finally {
if (kryo != null) {
kryoPool.yield(kryo);
}
}
}
};
private final Encoder encoder = new Encoder() {
@Override
public byte[] encode(Object in) throws IOException {
Kryo kryo = null;
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Output output = new Output(baos);
kryo = kryoPool.get();
kryo.writeClassAndObject(output, in);
output.close();
return baos.toByteArray();
} catch (Exception e) {
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
}
throw new RedissonKryoCodecException(e);
} finally {
if (kryo != null) {
kryoPool.yield(kryo);
}
}
}
};
public KryoCodec() { public KryoCodec() {
this(new KryoPoolImpl(Collections.<Class<?>>emptyList())); this(new KryoPoolImpl(Collections.<Class<?>>emptyList()));
} }
@ -128,54 +173,12 @@ public class KryoCodec implements Codec {
@Override @Override
public Decoder<Object> getValueDecoder() { public Decoder<Object> getValueDecoder() {
return new Decoder<Object>() { return decoder;
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
Kryo kryo = null;
try {
kryo = kryoPool.get();
return kryo.readClassAndObject(new Input(new ByteBufInputStream(buf)));
} catch (Exception e) {
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
}
throw new RedissonKryoCodecException(e);
} finally {
if (kryo != null) {
kryoPool.yield(kryo);
}
}
}
};
} }
@Override @Override
public Encoder getValueEncoder() { public Encoder getValueEncoder() {
return new Encoder() { return encoder;
@Override
public byte[] encode(Object in) throws IOException {
Kryo kryo = null;
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Output output = new Output(baos);
kryo = kryoPool.get();
kryo.writeClassAndObject(output, in);
output.close();
return baos.toByteArray();
} catch (Exception e) {
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
}
throw new RedissonKryoCodecException(e);
} finally {
if (kryo != null) {
kryoPool.yield(kryo);
}
}
}
};
} }
} }

@ -35,6 +35,32 @@ import io.netty.buffer.ByteBufInputStream;
*/ */
public class SerializationCodec implements Codec { public class SerializationCodec implements Codec {
private final Decoder<Object> decoder = new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
try {
ObjectInputStream inputStream = new ObjectInputStream(new ByteBufInputStream(buf));
return inputStream.readObject();
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw new IOException(e);
}
}
};
private final Encoder encoder = new Encoder() {
@Override
public byte[] encode(Object in) throws IOException {
ByteArrayOutputStream result = new ByteArrayOutputStream();
ObjectOutputStream outputStream = new ObjectOutputStream(result);
outputStream.writeObject(in);
outputStream.close();
return result.toByteArray();
}
};
@Override @Override
public Decoder<Object> getMapValueDecoder() { public Decoder<Object> getMapValueDecoder() {
return getValueDecoder(); return getValueDecoder();
@ -57,34 +83,12 @@ public class SerializationCodec implements Codec {
@Override @Override
public Decoder<Object> getValueDecoder() { public Decoder<Object> getValueDecoder() {
return new Decoder<Object>() { return decoder;
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
try {
ObjectInputStream inputStream = new ObjectInputStream(new ByteBufInputStream(buf));
return inputStream.readObject();
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw new IOException(e);
}
}
};
} }
@Override @Override
public Encoder getValueEncoder() { public Encoder getValueEncoder() {
return new Encoder() { return encoder;
@Override
public byte[] encode(Object in) throws IOException {
ByteArrayOutputStream result = new ByteArrayOutputStream();
ObjectOutputStream outputStream = new ObjectOutputStream(result);
outputStream.writeObject(in);
outputStream.close();
return result.toByteArray();
}
};
} }
} }

@ -374,7 +374,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return null; return null;
} }
return entry.unsubscribe(channelName, new BaseRedisPubSubListener() { Codec entryCodec = entry.getConnection().getChannels().get(channelName);
entry.unsubscribe(channelName, new BaseRedisPubSubListener() {
@Override @Override
public boolean onStatus(PubSubType type, String channel) { public boolean onStatus(PubSubType type, String channel) {
@ -390,6 +391,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
} }
}); });
return entryCodec;
} }
@Override @Override
@ -399,7 +401,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return null; return null;
} }
return entry.punsubscribe(channelName, new BaseRedisPubSubListener() { Codec entryCodec = entry.getConnection().getPatternChannels().get(channelName);
entry.punsubscribe(channelName, new BaseRedisPubSubListener() {
@Override @Override
public boolean onStatus(PubSubType type, String channel) { public boolean onStatus(PubSubType type, String channel) {
@ -415,6 +418,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
} }
}); });
return entryCodec;
} }
protected MasterSlaveEntry getEntry(int slot) { protected MasterSlaveEntry getEntry(int slot) {
@ -438,13 +442,24 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
entry.close(); entry.close();
Collection<RedisPubSubListener> listeners = entry.getListeners(channelName); Collection<RedisPubSubListener> listeners = entry.getListeners(channelName);
Codec subscribeCodec = unsubscribe(channelName); if (entry.getConnection().getPatternChannels().get(channelName) != null) {
if (!listeners.isEmpty()) { Codec subscribeCodec = punsubscribe(channelName);
PubSubConnectionEntry newEntry = subscribe(channelName, subscribeCodec); if (!listeners.isEmpty()) {
for (RedisPubSubListener redisPubSubListener : listeners) { PubSubConnectionEntry newEntry = psubscribe(channelName, subscribeCodec);
newEntry.addListener(channelName, redisPubSubListener); for (RedisPubSubListener redisPubSubListener : listeners) {
newEntry.addListener(channelName, redisPubSubListener);
}
log.debug("resubscribed listeners for '{}' channel-pattern", channelName);
}
} else {
Codec subscribeCodec = unsubscribe(channelName);
if (!listeners.isEmpty()) {
PubSubConnectionEntry newEntry = subscribe(channelName, subscribeCodec);
for (RedisPubSubListener redisPubSubListener : listeners) {
newEntry.addListener(channelName, redisPubSubListener);
}
log.debug("resubscribed listeners for '{}' channel", channelName);
} }
log.debug("resubscribed listeners for '{}' channel", channelName);
} }
} }
} }

@ -18,7 +18,6 @@ package org.redisson.connection;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Map;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
@ -33,8 +32,6 @@ import org.redisson.client.protocol.pubsub.PubSubType;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.netty.util.internal.PlatformDependent;
public class PubSubConnectionEntry { public class PubSubConnectionEntry {
public enum Status {ACTIVE, INACTIVE} public enum Status {ACTIVE, INACTIVE}
@ -46,7 +43,6 @@ public class PubSubConnectionEntry {
private final RedisPubSubConnection conn; private final RedisPubSubConnection conn;
private final int subscriptionsPerConnection; private final int subscriptionsPerConnection;
private final Map<String, Codec> channel2Codec = PlatformDependent.newConcurrentHashMap();
private final ConcurrentMap<String, Queue<RedisPubSubListener>> channelListeners = new ConcurrentHashMap<String, Queue<RedisPubSubListener>>(); private final ConcurrentMap<String, Queue<RedisPubSubListener>> channelListeners = new ConcurrentHashMap<String, Queue<RedisPubSubListener>>();
public PubSubConnectionEntry(RedisPubSubConnection conn, int subscriptionsPerConnection) { public PubSubConnectionEntry(RedisPubSubConnection conn, int subscriptionsPerConnection) {
@ -127,12 +123,10 @@ public class PubSubConnectionEntry {
} }
public void subscribe(Codec codec, String channelName) { public void subscribe(Codec codec, String channelName) {
channel2Codec.put(channelName, codec);
conn.subscribe(codec, channelName); conn.subscribe(codec, channelName);
} }
public void psubscribe(Codec codec, String pattern) { public void psubscribe(Codec codec, String pattern) {
channel2Codec.put(pattern, codec);
conn.psubscribe(codec, pattern); conn.psubscribe(codec, pattern);
} }
@ -141,7 +135,7 @@ public class PubSubConnectionEntry {
conn.subscribe(codec, channel); conn.subscribe(codec, channel);
} }
public Codec unsubscribe(final String channel, RedisPubSubListener listener) { public void unsubscribe(final String channel, RedisPubSubListener listener) {
conn.addOneShotListener(new BaseRedisPubSubListener<Object>() { conn.addOneShotListener(new BaseRedisPubSubListener<Object>() {
@Override @Override
public boolean onStatus(PubSubType type, String ch) { public boolean onStatus(PubSubType type, String ch) {
@ -155,7 +149,6 @@ public class PubSubConnectionEntry {
}); });
conn.addOneShotListener(listener); conn.addOneShotListener(listener);
conn.unsubscribe(channel); conn.unsubscribe(channel);
return channel2Codec.remove(channel);
} }
private void removeListeners(String channel) { private void removeListeners(String channel) {
@ -171,7 +164,7 @@ public class PubSubConnectionEntry {
subscribedChannelsAmount.release(); subscribedChannelsAmount.release();
} }
public Codec punsubscribe(final String channel, RedisPubSubListener listener) { public void punsubscribe(final String channel, RedisPubSubListener listener) {
conn.addOneShotListener(new BaseRedisPubSubListener<Object>() { conn.addOneShotListener(new BaseRedisPubSubListener<Object>() {
@Override @Override
public boolean onStatus(PubSubType type, String ch) { public boolean onStatus(PubSubType type, String ch) {
@ -184,7 +177,6 @@ public class PubSubConnectionEntry {
}); });
conn.addOneShotListener(listener); conn.addOneShotListener(listener);
conn.punsubscribe(channel); conn.punsubscribe(channel);
return channel2Codec.remove(channel);
} }

Loading…
Cancel
Save