diff --git a/src/main/java/org/redisson/client/handler/CommandEncoder.java b/src/main/java/org/redisson/client/handler/CommandEncoder.java index 18d9f05f0..bb112d3db 100644 --- a/src/main/java/org/redisson/client/handler/CommandEncoder.java +++ b/src/main/java/org/redisson/client/handler/CommandEncoder.java @@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelHandler.Sharable; import io.netty.handler.codec.MessageToByteEncoder; import io.netty.util.CharsetUtil; @@ -57,44 +58,59 @@ public class CommandEncoder extends MessageToByteEncoder> { private static final Map longCache = new HashMap(); @Override - protected void encode(ChannelHandlerContext ctx, CommandData msg, ByteBuf out) throws Exception { - out.writeByte(ARGS_PREFIX); - int len = 1 + msg.getParams().length; - if (msg.getCommand().getSubName() != null) { - len++; + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + try { + super.write(ctx, msg, promise); + } catch (Exception e) { + promise.tryFailure(e); + throw e; } - out.writeBytes(convert(len)); - out.writeBytes(CRLF); - - writeArgument(out, msg.getCommand().getName().getBytes("UTF-8")); - if (msg.getCommand().getSubName() != null) { - writeArgument(out, msg.getCommand().getSubName().getBytes("UTF-8")); - } - int i = 1; - for (Object param : msg.getParams()) { - Encoder encoder = paramsEncoder; - if (msg.getCommand().getInParamType().size() == 1) { - if (msg.getCommand().getInParamIndex() == i - && msg.getCommand().getInParamType().get(0) == ValueType.OBJECT) { - encoder = msg.getCodec().getValueEncoder(); - } else if (msg.getCommand().getInParamIndex() <= i - && msg.getCommand().getInParamType().get(0) != ValueType.OBJECT) { - encoder = selectEncoder(msg, i - msg.getCommand().getInParamIndex()); - } - } else { - if (msg.getCommand().getInParamIndex() <= i) { - int paramNum = i - msg.getCommand().getInParamIndex(); - encoder = selectEncoder(msg, paramNum); + } + + @Override + protected void encode(ChannelHandlerContext ctx, CommandData msg, ByteBuf out) throws Exception { + try { + out.writeByte(ARGS_PREFIX); + int len = 1 + msg.getParams().length; + if (msg.getCommand().getSubName() != null) { + len++; + } + out.writeBytes(convert(len)); + out.writeBytes(CRLF); + + writeArgument(out, msg.getCommand().getName().getBytes("UTF-8")); + if (msg.getCommand().getSubName() != null) { + writeArgument(out, msg.getCommand().getSubName().getBytes("UTF-8")); + } + int i = 1; + for (Object param : msg.getParams()) { + Encoder encoder = paramsEncoder; + if (msg.getCommand().getInParamType().size() == 1) { + if (msg.getCommand().getInParamIndex() == i + && msg.getCommand().getInParamType().get(0) == ValueType.OBJECT) { + encoder = msg.getCodec().getValueEncoder(); + } else if (msg.getCommand().getInParamIndex() <= i + && msg.getCommand().getInParamType().get(0) != ValueType.OBJECT) { + encoder = selectEncoder(msg, i - msg.getCommand().getInParamIndex()); + } + } else { + if (msg.getCommand().getInParamIndex() <= i) { + int paramNum = i - msg.getCommand().getInParamIndex(); + encoder = selectEncoder(msg, paramNum); + } } + + writeArgument(out, encoder.encode(param)); + + i++; } - - writeArgument(out, encoder.encode(param)); - - i++; - } - - if (log.isTraceEnabled()) { - log.trace("channel: {} message: {}", ctx.channel(), out.toString(CharsetUtil.UTF_8)); + + if (log.isTraceEnabled()) { + log.trace("channel: {} message: {}", ctx.channel(), out.toString(CharsetUtil.UTF_8)); + } + } catch (Exception e) { + msg.getPromise().tryFailure(e); + throw e; } } diff --git a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index 4a51c349c..c19bfb80d 100644 --- a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -146,7 +146,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { return newSucceededFuture(connection); } - RedisClient client = createClient(addr.getHost(), addr.getPort(), cfg.getConnectTimeout()); + RedisClient client = createClient(addr.getHost(), addr.getPort(), cfg.getConnectTimeout(), cfg.getRetryInterval() * cfg.getRetryAttempts()); final Promise result = newPromise(); Future future = client.connectAsync(); future.addListener(new FutureListener() { diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index 0b1243b7f..13ced9a9b 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -92,7 +92,7 @@ public interface ConnectionManager { Future connectionWriteOp(NodeSource source, RedisCommand command); - RedisClient createClient(String host, int port, int timeout); + RedisClient createClient(String host, int port, int timeout, int commandTimeout); RedisClient createClient(NodeType type, String host, int port); diff --git a/src/main/java/org/redisson/connection/ElasticacheConnectionManager.java b/src/main/java/org/redisson/connection/ElasticacheConnectionManager.java index b7d894788..539134d7f 100644 --- a/src/main/java/org/redisson/connection/ElasticacheConnectionManager.java +++ b/src/main/java/org/redisson/connection/ElasticacheConnectionManager.java @@ -107,7 +107,7 @@ public class ElasticacheConnectionManager extends MasterSlaveConnectionManager { if (connection != null) { return connection; } - RedisClient client = createClient(addr.getHost(), addr.getPort(), cfg.getConnectTimeout()); + RedisClient client = createClient(addr.getHost(), addr.getPort(), cfg.getConnectTimeout(), cfg.getRetryInterval() * cfg.getRetryAttempts()); try { connection = client.connect(); Promise future = newPromise(); diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 0d39d437d..1ebcf75fa 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -284,7 +284,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public RedisClient createClient(NodeType type, String host, int port) { - RedisClient client = createClient(host, port, config.getConnectTimeout()); + RedisClient client = createClient(host, port, config.getConnectTimeout(), config.getRetryInterval() * config.getRetryAttempts()); clients.add(new RedisClientEntry(client, this, type)); return client; } @@ -295,8 +295,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } @Override - public RedisClient createClient(String host, int port, int timeout) { - return new RedisClient(group, socketChannelClass, host, port, timeout); + public RedisClient createClient(String host, int port, int timeout, int commandTimeout) { + return new RedisClient(group, socketChannelClass, host, port, timeout, commandTimeout); } @Override diff --git a/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/src/main/java/org/redisson/connection/SentinelConnectionManager.java index 063908b48..8a7d0bda1 100755 --- a/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -64,7 +64,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { final MasterSlaveServersConfig c = create(cfg); for (URI addr : cfg.getSentinelAddresses()) { - RedisClient client = createClient(addr.getHost(), addr.getPort(), c.getConnectTimeout()); + RedisClient client = createClient(addr.getHost(), addr.getPort(), c.getConnectTimeout(), c.getRetryInterval() * c.getRetryAttempts()); try { RedisConnection connection = client.connect(); if (!connection.isActive()) { @@ -140,7 +140,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } private Future registerSentinel(final SentinelServersConfig cfg, final URI addr, final MasterSlaveServersConfig c) { - RedisClient client = createClient(addr.getHost(), addr.getPort(), c.getConnectTimeout()); + RedisClient client = createClient(addr.getHost(), addr.getPort(), c.getConnectTimeout(), c.getRetryInterval() * c.getRetryAttempts()); RedisClient oldClient = sentinels.putIfAbsent(addr.getHost() + ":" + addr.getPort(), client); if (oldClient != null) { return newSucceededFuture(null); diff --git a/src/test/java/org/redisson/CommandHandlersTest.java b/src/test/java/org/redisson/CommandHandlersTest.java new file mode 100644 index 000000000..6bff54783 --- /dev/null +++ b/src/test/java/org/redisson/CommandHandlersTest.java @@ -0,0 +1,31 @@ +package org.redisson; + +import org.junit.Test; +import org.redisson.client.RedisException; +import org.redisson.Config; + +public class CommandHandlersTest extends BaseTest { + + @Test(expected = RedisException.class) + public void testEncoder() throws InterruptedException { + Config config = createConfig(); + config.setCodec(new ErrorsCodec()); + + RedissonClient redisson = Redisson.create(config); + + redisson.getBucket("1234").set("1234"); + } + + @Test(expected = RedisException.class) + public void testDecoder() { + redisson.getBucket("1234").set("1234"); + + Config config = createConfig(); + config.setCodec(new ErrorsCodec()); + + RedissonClient redisson = Redisson.create(config); + + redisson.getBucket("1234").get(); + } + +}