diff --git a/src/main/java/org/redisson/CommandExecutorService.java b/src/main/java/org/redisson/CommandExecutorService.java index f70246516..c4ad9b142 100644 --- a/src/main/java/org/redisson/CommandExecutorService.java +++ b/src/main/java/org/redisson/CommandExecutorService.java @@ -348,7 +348,9 @@ public class CommandExecutorService implements CommandExecutor { attemptPromise.setFailure(ex.get()); return; } - attemptPromise.cancel(true); + if (!attemptPromise.cancel(false)) { + return; + } int count = attempt + 1; async(readOnlyMode, slot, messageDecoder, codec, command, params, mainPromise, count); diff --git a/src/main/java/org/redisson/client/handler/CommandDecoder.java b/src/main/java/org/redisson/client/handler/CommandDecoder.java index 59a292afe..15ffabd31 100644 --- a/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -21,6 +21,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CancellationException; import org.redisson.client.RedisException; import org.redisson.client.RedisMovedException; @@ -44,6 +45,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ReplayingDecoder; import io.netty.util.CharsetUtil; +import io.netty.util.concurrent.Promise; import io.netty.util.internal.PlatformDependent; /** @@ -125,7 +127,10 @@ public class CommandDecoder extends ReplayingDecoder { } if (i == commands.getCommands().size()) { - commands.getPromise().setSuccess(null); + Promise promise = commands.getPromise(); + if (!promise.trySuccess(null) && promise.isCancelled()) { + log.warn("response has been skipped due to timeout! channel: {}, command: {}", ctx.channel(), data); + } ctx.channel().attr(CommandsQueue.REPLAY).remove(); ctx.pipeline().fireUserEventTriggered(QueueCommands.NEXT_COMMAND); @@ -150,7 +155,7 @@ public class CommandDecoder extends ReplayingDecoder { String result = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8); in.skipBytes(2); - handleResult(data, parts, result, false); + handleResult(data, parts, result, false, channel); } else if (code == '-') { String error = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8); in.skipBytes(2); @@ -170,14 +175,14 @@ public class CommandDecoder extends ReplayingDecoder { String status = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8); in.skipBytes(2); Object result = Long.valueOf(status); - handleResult(data, parts, result, false); + handleResult(data, parts, result, false, channel); } else if (code == '$') { ByteBuf buf = readBytes(in); Object result = null; if (buf != null) { result = decoder(data, parts, currentDecoder).decode(buf, state()); } - handleResult(data, parts, result, false); + handleResult(data, parts, result, false, channel); } else if (code == '*') { long size = readLong(in); // state().setSize(size); @@ -229,7 +234,7 @@ public class CommandDecoder extends ReplayingDecoder { } if (data != null) { - handleResult(data, parts, result, true); + handleResult(data, parts, result, true, channel); } else { RedisPubSubConnection pubSubConnection = (RedisPubSubConnection)channel.attr(RedisPubSubConnection.CONNECTION).get(); if (result instanceof PubSubStatusMessage) { @@ -242,7 +247,7 @@ public class CommandDecoder extends ReplayingDecoder { } } - private void handleResult(CommandData data, List parts, Object result, boolean multiResult) { + private void handleResult(CommandData data, List parts, Object result, boolean multiResult, Channel channel) { if (data != null) { if (multiResult) { result = data.getCommand().getConvertor().convertMulti(result); @@ -253,11 +258,9 @@ public class CommandDecoder extends ReplayingDecoder { if (parts != null) { parts.add(result); } else { - if (data.getPromise().isDone()) { - log.error("promise already done, something is wrong! result: {} promise command {}", result, data); - return; + if (!data.getPromise().trySuccess(result) && data.getPromise().isCancelled()) { + log.warn("response has been skipped due to timeout! channel: {}, command: {}, result: {}", channel, data, result); } - data.getPromise().setSuccess(result); } } diff --git a/src/main/java/org/redisson/connection/BaseLoadBalancer.java b/src/main/java/org/redisson/connection/BaseLoadBalancer.java index f41dce6f3..7ab40cb2e 100644 --- a/src/main/java/org/redisson/connection/BaseLoadBalancer.java +++ b/src/main/java/org/redisson/connection/BaseLoadBalancer.java @@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import org.redisson.MasterSlaveServersConfig; import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnectionException; +import org.redisson.client.RedisException; import org.redisson.client.RedisPubSubConnection; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; @@ -179,20 +180,8 @@ abstract class BaseLoadBalancer implements LoadBalancer { return conn; } try { - conn = entry.getClient().connect(); - if (config.getPassword() != null) { - conn.sync(RedisCommands.AUTH, config.getPassword()); - } - if (config.getDatabase() != 0) { - conn.sync(RedisCommands.SELECT, config.getDatabase()); - } - if (config.getClientName() != null) { - conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName()); - } - log.debug("new connection created: {}", conn); - - return conn; - } catch (RedisConnectionException e) { + return entry.connect(config); + } catch (RedisException e) { entry.getConnectionsSemaphore().release(); // TODO connection scoring log.warn("Can't connect to {}, trying next connection!", entry.getClient().getAddr()); diff --git a/src/main/java/org/redisson/connection/ConnectionEntry.java b/src/main/java/org/redisson/connection/ConnectionEntry.java index 025915377..e1f358af7 100644 --- a/src/main/java/org/redisson/connection/ConnectionEntry.java +++ b/src/main/java/org/redisson/connection/ConnectionEntry.java @@ -19,11 +19,17 @@ import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Semaphore; +import org.redisson.MasterSlaveServersConfig; import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; +import org.redisson.client.protocol.RedisCommands; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ConnectionEntry { + private final Logger log = LoggerFactory.getLogger(getClass()); + private volatile boolean freezed; private final RedisClient client; @@ -55,4 +61,21 @@ public class ConnectionEntry { return connections; } + public RedisConnection connect(MasterSlaveServersConfig config) { + RedisConnection conn = client.connect(); + if (config.getPassword() != null) { + conn.sync(RedisCommands.AUTH, config.getPassword()); + } + if (config.getDatabase() != 0) { + conn.sync(RedisCommands.SELECT, config.getDatabase()); + } + if (config.getClientName() != null) { + conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName()); + } + + log.debug("new connection created: {}", conn); + + return conn; + } + } diff --git a/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 78f6a72e0..dc09b1398 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -24,6 +24,7 @@ import org.redisson.MasterSlaveServersConfig; import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnectionException; +import org.redisson.client.RedisException; import org.redisson.client.RedisPubSubConnection; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; @@ -38,7 +39,7 @@ import org.slf4j.LoggerFactory; //TODO ping support public class MasterSlaveEntry { - private final Logger log = LoggerFactory.getLogger(getClass()); + final Logger log = LoggerFactory.getLogger(getClass()); LoadBalancer slaveBalancer; volatile ConnectionEntry masterEntry; @@ -124,20 +125,8 @@ public class MasterSlaveEntry { } try { - conn = masterEntry.getClient().connect(); - if (config.getPassword() != null) { - conn.sync(RedisCommands.AUTH, config.getPassword()); - } - if (config.getDatabase() != 0) { - conn.sync(RedisCommands.SELECT, config.getDatabase()); - } - if (config.getClientName() != null) { - conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName()); - } - log.debug("new connection created: {}", conn); - - return conn; - } catch (RedisConnectionException e) { + return masterEntry.connect(config); + } catch (RedisException e) { masterEntry.getConnectionsSemaphore().release(); throw e; } diff --git a/src/main/java/org/redisson/connection/SingleEntry.java b/src/main/java/org/redisson/connection/SingleEntry.java index 23cc991fa..83b384f5f 100644 --- a/src/main/java/org/redisson/connection/SingleEntry.java +++ b/src/main/java/org/redisson/connection/SingleEntry.java @@ -22,13 +22,9 @@ import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisPubSubConnection; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class SingleEntry extends MasterSlaveEntry { - private final Logger log = LoggerFactory.getLogger(getClass()); - public SingleEntry(Codec codec, ConnectionManager connectionManager, MasterSlaveServersConfig config) { super(codec, connectionManager, config); }