Merge branch 'master' into 3.0.0

pull/1821/head
Nikita 7 years ago
commit 64501f1cc5

@ -28,9 +28,9 @@ import io.netty.util.concurrent.Promise;
*/ */
public class SubscribeListener extends BaseRedisPubSubListener { public class SubscribeListener extends BaseRedisPubSubListener {
Promise<Void> promise = ImmediateEventExecutor.INSTANCE.newPromise(); private final Promise<Void> promise = ImmediateEventExecutor.INSTANCE.newPromise();
String name; private final String name;
PubSubType type; private final PubSubType type;
public SubscribeListener(String name, PubSubType type) { public SubscribeListener(String name, PubSubType type) {
super(); super();

@ -92,9 +92,14 @@ public class CommandDecoder extends ReplayingDecoder<State> {
state().setDecoderState(null); state().setDecoderState(null);
if (data == null) { if (data == null) {
try {
while (in.writerIndex() > in.readerIndex()) { while (in.writerIndex() > in.readerIndex()) {
decode(in, null, null, ctx.channel()); decode(in, null, null, ctx.channel());
} }
} catch (Exception e) {
log.error("Unable to decode data. channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8), e);
throw e;
}
} else if (data instanceof CommandData) { } else if (data instanceof CommandData) {
CommandData<Object, Object> cmd = (CommandData<Object, Object>)data; CommandData<Object, Object> cmd = (CommandData<Object, Object>)data;
try { try {
@ -104,6 +109,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
decode(in, cmd, null, ctx.channel()); decode(in, cmd, null, ctx.channel());
} }
} catch (Exception e) { } catch (Exception e) {
log.error("Unable to decode data. channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8), e);
cmd.tryFailure(e); cmd.tryFailure(e);
throw e; throw e;
} }

@ -53,12 +53,10 @@ public class CommandsQueue extends ChannelDuplexHandler {
private final Queue<QueueCommandHolder> queue = PlatformDependent.newMpscQueue(); private final Queue<QueueCommandHolder> queue = PlatformDependent.newMpscQueue();
private volatile boolean isInactive;
private final ChannelFutureListener listener = new ChannelFutureListener() { private final ChannelFutureListener listener = new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess() && !isInactive) { if (!future.isSuccess() && future.channel().isActive()) {
sendNextCommand(future.channel()); sendNextCommand(future.channel());
} }
} }
@ -72,7 +70,6 @@ public class CommandsQueue extends ChannelDuplexHandler {
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
isInactive = true;
while (true) { while (true) {
QueueCommandHolder command = queue.poll(); QueueCommandHolder command = queue.poll();
if (command == null) { if (command == null) {

@ -15,7 +15,6 @@
*/ */
package org.redisson.client.handler; package org.redisson.client.handler;
import java.net.SocketAddress;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -31,9 +30,9 @@ import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.ChannelGroup;
import io.netty.util.Timeout; import io.netty.util.Timeout;
import io.netty.util.Timer; import io.netty.util.Timer;
@ -120,7 +119,11 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
if (future.isSuccess()) { if (future.isSuccess()) {
final Channel channel = future.channel(); final Channel channel = future.channel();
if (channel.localAddress().equals(channel.remoteAddress())) {
channel.close();
log.error("local address and remote address are the same! connected to: {}, localAddress: {} remoteAddress: {}",
connection.getRedisClient().getAddr(), channel.localAddress(), channel.remoteAddress());
} else {
RedisConnection c = RedisConnection.getFrom(channel); RedisConnection c = RedisConnection.getFrom(channel);
c.getConnectionPromise().addListener(new FutureListener<RedisConnection>() { c.getConnectionPromise().addListener(new FutureListener<RedisConnection>() {
@Override @Override
@ -136,6 +139,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
}); });
return; return;
} }
}
reconnect(connection, nextAttempt); reconnect(connection, nextAttempt);
} }

@ -49,6 +49,12 @@ public class AvroJacksonCodec extends JsonJacksonCodec {
this.schema = schema; this.schema = schema;
} }
@Override
public AvroMapper copy() {
_checkInvalidCopy(AvroExtendedMapper.class);
return new AvroExtendedMapper(type, schema);
}
@Override @Override
public void writeValue(OutputStream out, Object value) public void writeValue(OutputStream out, Object value)
throws IOException, JsonGenerationException, JsonMappingException { throws IOException, JsonGenerationException, JsonMappingException {

@ -44,7 +44,7 @@ public class RedissonCodecTest extends BaseTest {
private Codec cborCodec = new CborJacksonCodec(); private Codec cborCodec = new CborJacksonCodec();
private Codec fstCodec = new FstCodec(); private Codec fstCodec = new FstCodec();
private Codec snappyCodec = new SnappyCodec(); private Codec snappyCodec = new SnappyCodec();
private Codec msgPackCodec = new MsgPackJacksonCodec(); // private Codec msgPackCodec = new MsgPackJacksonCodec();
private Codec lz4Codec = new LZ4Codec(); private Codec lz4Codec = new LZ4Codec();
private Codec jsonListOfStringCodec = new JsonJacksonMapCodec( private Codec jsonListOfStringCodec = new JsonJacksonMapCodec(
new TypeReference<String>() {}, new TypeReference<List<String>>() {}); new TypeReference<String>() {}, new TypeReference<List<String>>() {});
@ -67,14 +67,14 @@ public class RedissonCodecTest extends BaseTest {
test(redisson); test(redisson);
} }
@Test // @Test
public void testMsgPack() { // public void testMsgPack() {
Config config = createConfig(); // Config config = createConfig();
config.setCodec(msgPackCodec); // config.setCodec(msgPackCodec);
RedissonClient redisson = Redisson.create(config); // RedissonClient redisson = Redisson.create(config);
//
test(redisson); // test(redisson);
} // }
@Test @Test
public void testSmile() { public void testSmile() {

@ -244,7 +244,7 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
Assert.assertTrue(latch.await(1, TimeUnit.SECONDS)); Assert.assertTrue(latch.await(1, TimeUnit.SECONDS));
RLock lock = redisson.getFairLock("lock"); RLock lock = redisson.getFairLock("lock");
await().atMost(redisson.getConfig().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS).until(() -> !lock.isLocked()); await().atMost(redisson.getConfig().getLockWatchdogTimeout() + 1000, TimeUnit.MILLISECONDS).until(() -> !lock.isLocked());
} }
@Test @Test

@ -407,7 +407,7 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest {
}); });
RReadWriteLock lock1 = redisson.getReadWriteLock("lock"); RReadWriteLock lock1 = redisson.getReadWriteLock("lock");
await().atMost(redisson.getConfig().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS).until(() -> !lock1.writeLock().isLocked()); await().atMost(redisson.getConfig().getLockWatchdogTimeout() + 1000, TimeUnit.MILLISECONDS).until(() -> !lock1.writeLock().isLocked());
} }
@Test @Test

@ -280,7 +280,7 @@ public class RedissonSetCacheReactiveTest extends BaseReactiveTest {
@Test @Test
public void testScheduler() throws InterruptedException { public void testScheduler() throws InterruptedException {
RSetCacheReactive<String> cache = redisson.getSetCache("simple33", new MsgPackJacksonCodec()); RSetCacheReactive<String> cache = redisson.getSetCache("simple33");
Assert.assertFalse(sync(cache.contains("33"))); Assert.assertFalse(sync(cache.contains("33")));
Assert.assertTrue(sync(cache.add("33", 5, TimeUnit.SECONDS))); Assert.assertTrue(sync(cache.add("33", 5, TimeUnit.SECONDS)));

@ -438,7 +438,7 @@ public class RedissonSetCacheTest extends BaseTest {
@Test @Test
public void testScheduler() throws InterruptedException { public void testScheduler() throws InterruptedException {
RSetCache<String> cache = redisson.getSetCache("simple33", new MsgPackJacksonCodec()); RSetCache<String> cache = redisson.getSetCache("simple33");
Assert.assertFalse(cache.contains("33")); Assert.assertFalse(cache.contains("33"));
Assert.assertTrue(cache.add("33", 5, TimeUnit.SECONDS)); Assert.assertTrue(cache.add("33", 5, TimeUnit.SECONDS));

Loading…
Cancel
Save