refactoring

pull/1344/head
Nikita 7 years ago
parent b534c7f856
commit f0953279b4

@ -28,9 +28,9 @@ import io.netty.util.concurrent.Promise;
*/
public class SubscribeListener extends BaseRedisPubSubListener {
Promise<Void> promise = ImmediateEventExecutor.INSTANCE.newPromise();
String name;
PubSubType type;
private final Promise<Void> promise = ImmediateEventExecutor.INSTANCE.newPromise();
private final String name;
private final PubSubType type;
public SubscribeListener(String name, PubSubType type) {
super();

@ -92,8 +92,13 @@ public class CommandDecoder extends ReplayingDecoder<State> {
state().setDecoderState(null);
if (data == null) {
while (in.writerIndex() > in.readerIndex()) {
decode(in, null, null, ctx.channel());
try {
while (in.writerIndex() > in.readerIndex()) {
decode(in, null, null, ctx.channel());
}
} catch (Exception e) {
log.error("Unable to decode data. channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8), e);
throw e;
}
} else if (data instanceof CommandData) {
CommandData<Object, Object> cmd = (CommandData<Object, Object>)data;
@ -104,6 +109,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
decode(in, cmd, null, ctx.channel());
}
} catch (Exception e) {
log.error("Unable to decode data. channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8), e);
cmd.tryFailure(e);
throw e;
}

@ -53,12 +53,10 @@ public class CommandsQueue extends ChannelDuplexHandler {
private final Queue<QueueCommandHolder> queue = PlatformDependent.newMpscQueue();
private volatile boolean isInactive;
private final ChannelFutureListener listener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess() && !isInactive) {
if (!future.isSuccess() && future.channel().isActive()) {
sendNextCommand(future.channel());
}
}
@ -72,7 +70,6 @@ public class CommandsQueue extends ChannelDuplexHandler {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
isInactive = true;
while (true) {
QueueCommandHolder command = queue.poll();
if (command == null) {

@ -15,7 +15,6 @@
*/
package org.redisson.client.handler;
import java.net.SocketAddress;
import java.util.Map.Entry;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
@ -31,9 +30,9 @@ import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.group.ChannelGroup;
import io.netty.util.Timeout;
import io.netty.util.Timer;
@ -120,21 +119,26 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
if (future.isSuccess()) {
final Channel channel = future.channel();
RedisConnection c = RedisConnection.getFrom(channel);
c.getConnectionPromise().addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> future) throws Exception {
if (future.isSuccess()) {
refresh(connection, channel);
log.debug("{} connected to {}, command: {}", connection, connection.getRedisClient().getAddr(), connection.getCurrentCommand());
} else {
log.warn("Can't connect " + connection + " to " + connection.getRedisClient().getAddr(), future.cause());
if (channel.localAddress().equals(channel.remoteAddress())) {
channel.close();
log.error("local address and remote address are the same! connected to: {}, localAddress: {} remoteAddress: {}",
connection.getRedisClient().getAddr(), channel.localAddress(), channel.remoteAddress());
} else {
RedisConnection c = RedisConnection.getFrom(channel);
c.getConnectionPromise().addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> future) throws Exception {
if (future.isSuccess()) {
refresh(connection, channel);
log.debug("{} connected to {}, command: {}", connection, connection.getRedisClient().getAddr(), connection.getCurrentCommand());
} else {
log.warn("Can't connect " + connection + " to " + connection.getRedisClient().getAddr(), future.cause());
}
}
}
});
return;
});
return;
}
}
reconnect(connection, nextAttempt);

@ -244,7 +244,7 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
Assert.assertTrue(latch.await(1, TimeUnit.SECONDS));
RLock lock = redisson.getFairLock("lock");
await().atMost(redisson.getConfig().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS).until(() -> !lock.isLocked());
await().atMost(redisson.getConfig().getLockWatchdogTimeout() + 1000, TimeUnit.MILLISECONDS).until(() -> !lock.isLocked());
}
@Test

@ -407,7 +407,7 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest {
});
RReadWriteLock lock1 = redisson.getReadWriteLock("lock");
await().atMost(redisson.getConfig().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS).until(() -> !lock1.writeLock().isLocked());
await().atMost(redisson.getConfig().getLockWatchdogTimeout() + 1000, TimeUnit.MILLISECONDS).until(() -> !lock1.writeLock().isLocked());
}
@Test

@ -280,7 +280,7 @@ public class RedissonSetCacheReactiveTest extends BaseReactiveTest {
@Test
public void testScheduler() throws InterruptedException {
RSetCacheReactive<String> cache = redisson.getSetCache("simple33", new MsgPackJacksonCodec());
RSetCacheReactive<String> cache = redisson.getSetCache("simple33");
Assert.assertFalse(sync(cache.contains("33")));
Assert.assertTrue(sync(cache.add("33", 5, TimeUnit.SECONDS)));

@ -438,7 +438,7 @@ public class RedissonSetCacheTest extends BaseTest {
@Test
public void testScheduler() throws InterruptedException {
RSetCache<String> cache = redisson.getSetCache("simple33", new MsgPackJacksonCodec());
RSetCache<String> cache = redisson.getSetCache("simple33");
Assert.assertFalse(cache.contains("33"));
Assert.assertTrue(cache.add("33", 5, TimeUnit.SECONDS));

Loading…
Cancel
Save