Merge branch '2.2.x'

pull/574/merge
Nikita 9 years ago
commit b32b31e094

@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.CharsetUtil;
@ -57,44 +58,59 @@ public class CommandEncoder extends MessageToByteEncoder<CommandData<?, ?>> {
private static final Map<Long, byte[]> longCache = new HashMap<Long, byte[]>();
@Override
protected void encode(ChannelHandlerContext ctx, CommandData<?, ?> msg, ByteBuf out) throws Exception {
out.writeByte(ARGS_PREFIX);
int len = 1 + msg.getParams().length;
if (msg.getCommand().getSubName() != null) {
len++;
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
try {
super.write(ctx, msg, promise);
} catch (Exception e) {
promise.tryFailure(e);
throw e;
}
out.writeBytes(convert(len));
out.writeBytes(CRLF);
writeArgument(out, msg.getCommand().getName().getBytes("UTF-8"));
if (msg.getCommand().getSubName() != null) {
writeArgument(out, msg.getCommand().getSubName().getBytes("UTF-8"));
}
int i = 1;
for (Object param : msg.getParams()) {
Encoder encoder = paramsEncoder;
if (msg.getCommand().getInParamType().size() == 1) {
if (msg.getCommand().getInParamIndex() == i
&& msg.getCommand().getInParamType().get(0) == ValueType.OBJECT) {
encoder = msg.getCodec().getValueEncoder();
} else if (msg.getCommand().getInParamIndex() <= i
&& msg.getCommand().getInParamType().get(0) != ValueType.OBJECT) {
encoder = selectEncoder(msg, i - msg.getCommand().getInParamIndex());
}
} else {
if (msg.getCommand().getInParamIndex() <= i) {
int paramNum = i - msg.getCommand().getInParamIndex();
encoder = selectEncoder(msg, paramNum);
}
@Override
protected void encode(ChannelHandlerContext ctx, CommandData<?, ?> msg, ByteBuf out) throws Exception {
try {
out.writeByte(ARGS_PREFIX);
int len = 1 + msg.getParams().length;
if (msg.getCommand().getSubName() != null) {
len++;
}
out.writeBytes(convert(len));
out.writeBytes(CRLF);
writeArgument(out, msg.getCommand().getName().getBytes("UTF-8"));
if (msg.getCommand().getSubName() != null) {
writeArgument(out, msg.getCommand().getSubName().getBytes("UTF-8"));
}
int i = 1;
for (Object param : msg.getParams()) {
Encoder encoder = paramsEncoder;
if (msg.getCommand().getInParamType().size() == 1) {
if (msg.getCommand().getInParamIndex() == i
&& msg.getCommand().getInParamType().get(0) == ValueType.OBJECT) {
encoder = msg.getCodec().getValueEncoder();
} else if (msg.getCommand().getInParamIndex() <= i
&& msg.getCommand().getInParamType().get(0) != ValueType.OBJECT) {
encoder = selectEncoder(msg, i - msg.getCommand().getInParamIndex());
}
} else {
if (msg.getCommand().getInParamIndex() <= i) {
int paramNum = i - msg.getCommand().getInParamIndex();
encoder = selectEncoder(msg, paramNum);
}
}
writeArgument(out, encoder.encode(param));
i++;
}
writeArgument(out, encoder.encode(param));
i++;
}
if (log.isTraceEnabled()) {
log.trace("channel: {} message: {}", ctx.channel(), out.toString(CharsetUtil.UTF_8));
if (log.isTraceEnabled()) {
log.trace("channel: {} message: {}", ctx.channel(), out.toString(CharsetUtil.UTF_8));
}
} catch (Exception e) {
msg.getPromise().tryFailure(e);
throw e;
}
}

@ -146,7 +146,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
return newSucceededFuture(connection);
}
RedisClient client = createClient(addr.getHost(), addr.getPort(), cfg.getConnectTimeout());
RedisClient client = createClient(addr.getHost(), addr.getPort(), cfg.getConnectTimeout(), cfg.getRetryInterval() * cfg.getRetryAttempts());
final Promise<RedisConnection> result = newPromise();
Future<RedisConnection> future = client.connectAsync();
future.addListener(new FutureListener<RedisConnection>() {

@ -92,7 +92,7 @@ public interface ConnectionManager {
Future<RedisConnection> connectionWriteOp(NodeSource source, RedisCommand<?> command);
RedisClient createClient(String host, int port, int timeout);
RedisClient createClient(String host, int port, int timeout, int commandTimeout);
RedisClient createClient(NodeType type, String host, int port);

@ -107,7 +107,7 @@ public class ElasticacheConnectionManager extends MasterSlaveConnectionManager {
if (connection != null) {
return connection;
}
RedisClient client = createClient(addr.getHost(), addr.getPort(), cfg.getConnectTimeout());
RedisClient client = createClient(addr.getHost(), addr.getPort(), cfg.getConnectTimeout(), cfg.getRetryInterval() * cfg.getRetryAttempts());
try {
connection = client.connect();
Promise<RedisConnection> future = newPromise();

@ -284,7 +284,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override
public RedisClient createClient(NodeType type, String host, int port) {
RedisClient client = createClient(host, port, config.getConnectTimeout());
RedisClient client = createClient(host, port, config.getConnectTimeout(), config.getRetryInterval() * config.getRetryAttempts());
clients.add(new RedisClientEntry(client, this, type));
return client;
}
@ -295,8 +295,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
@Override
public RedisClient createClient(String host, int port, int timeout) {
return new RedisClient(group, socketChannelClass, host, port, timeout);
public RedisClient createClient(String host, int port, int timeout, int commandTimeout) {
return new RedisClient(group, socketChannelClass, host, port, timeout, commandTimeout);
}
@Override

@ -64,7 +64,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
final MasterSlaveServersConfig c = create(cfg);
for (URI addr : cfg.getSentinelAddresses()) {
RedisClient client = createClient(addr.getHost(), addr.getPort(), c.getConnectTimeout());
RedisClient client = createClient(addr.getHost(), addr.getPort(), c.getConnectTimeout(), c.getRetryInterval() * c.getRetryAttempts());
try {
RedisConnection connection = client.connect();
if (!connection.isActive()) {
@ -140,7 +140,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
private Future<RedisPubSubConnection> registerSentinel(final SentinelServersConfig cfg, final URI addr, final MasterSlaveServersConfig c) {
RedisClient client = createClient(addr.getHost(), addr.getPort(), c.getConnectTimeout());
RedisClient client = createClient(addr.getHost(), addr.getPort(), c.getConnectTimeout(), c.getRetryInterval() * c.getRetryAttempts());
RedisClient oldClient = sentinels.putIfAbsent(addr.getHost() + ":" + addr.getPort(), client);
if (oldClient != null) {
return newSucceededFuture(null);

@ -0,0 +1,31 @@
package org.redisson;
import org.junit.Test;
import org.redisson.client.RedisException;
import org.redisson.Config;
public class CommandHandlersTest extends BaseTest {
@Test(expected = RedisException.class)
public void testEncoder() throws InterruptedException {
Config config = createConfig();
config.setCodec(new ErrorsCodec());
RedissonClient redisson = Redisson.create(config);
redisson.getBucket("1234").set("1234");
}
@Test(expected = RedisException.class)
public void testDecoder() {
redisson.getBucket("1234").set("1234");
Config config = createConfig();
config.setCodec(new ErrorsCodec());
RedissonClient redisson = Redisson.create(config);
redisson.getBucket("1234").get();
}
}
Loading…
Cancel
Save