Connection timeout handling minor improvements. #199

pull/243/head
Nikita 10 years ago
parent 7f95eb99e9
commit fcd72d9ba8

@ -348,7 +348,9 @@ public class CommandExecutorService implements CommandExecutor {
attemptPromise.setFailure(ex.get());
return;
}
attemptPromise.cancel(true);
if (!attemptPromise.cancel(false)) {
return;
}
int count = attempt + 1;
async(readOnlyMode, slot, messageDecoder, codec, command, params, mainPromise, count);

@ -21,6 +21,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import org.redisson.client.RedisException;
import org.redisson.client.RedisMovedException;
@ -44,6 +45,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;
/**
@ -125,7 +127,10 @@ public class CommandDecoder extends ReplayingDecoder<State> {
}
if (i == commands.getCommands().size()) {
commands.getPromise().setSuccess(null);
Promise<Void> promise = commands.getPromise();
if (!promise.trySuccess(null) && promise.isCancelled()) {
log.warn("response has been skipped due to timeout! channel: {}, command: {}", ctx.channel(), data);
}
ctx.channel().attr(CommandsQueue.REPLAY).remove();
ctx.pipeline().fireUserEventTriggered(QueueCommands.NEXT_COMMAND);
@ -150,7 +155,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
String result = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8);
in.skipBytes(2);
handleResult(data, parts, result, false);
handleResult(data, parts, result, false, channel);
} else if (code == '-') {
String error = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8);
in.skipBytes(2);
@ -170,14 +175,14 @@ public class CommandDecoder extends ReplayingDecoder<State> {
String status = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8);
in.skipBytes(2);
Object result = Long.valueOf(status);
handleResult(data, parts, result, false);
handleResult(data, parts, result, false, channel);
} else if (code == '$') {
ByteBuf buf = readBytes(in);
Object result = null;
if (buf != null) {
result = decoder(data, parts, currentDecoder).decode(buf, state());
}
handleResult(data, parts, result, false);
handleResult(data, parts, result, false, channel);
} else if (code == '*') {
long size = readLong(in);
// state().setSize(size);
@ -229,7 +234,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
}
if (data != null) {
handleResult(data, parts, result, true);
handleResult(data, parts, result, true, channel);
} else {
RedisPubSubConnection pubSubConnection = (RedisPubSubConnection)channel.attr(RedisPubSubConnection.CONNECTION).get();
if (result instanceof PubSubStatusMessage) {
@ -242,7 +247,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
}
}
private void handleResult(CommandData<Object, Object> data, List<Object> parts, Object result, boolean multiResult) {
private void handleResult(CommandData<Object, Object> data, List<Object> parts, Object result, boolean multiResult, Channel channel) {
if (data != null) {
if (multiResult) {
result = data.getCommand().getConvertor().convertMulti(result);
@ -253,11 +258,9 @@ public class CommandDecoder extends ReplayingDecoder<State> {
if (parts != null) {
parts.add(result);
} else {
if (data.getPromise().isDone()) {
log.error("promise already done, something is wrong! result: {} promise command {}", result, data);
return;
if (!data.getPromise().trySuccess(result) && data.getPromise().isCancelled()) {
log.warn("response has been skipped due to timeout! channel: {}, command: {}, result: {}", channel, data, result);
}
data.getPromise().setSuccess(result);
}
}

@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
@ -179,20 +180,8 @@ abstract class BaseLoadBalancer implements LoadBalancer {
return conn;
}
try {
conn = entry.getClient().connect();
if (config.getPassword() != null) {
conn.sync(RedisCommands.AUTH, config.getPassword());
}
if (config.getDatabase() != 0) {
conn.sync(RedisCommands.SELECT, config.getDatabase());
}
if (config.getClientName() != null) {
conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName());
}
log.debug("new connection created: {}", conn);
return conn;
} catch (RedisConnectionException e) {
return entry.connect(config);
} catch (RedisException e) {
entry.getConnectionsSemaphore().release();
// TODO connection scoring
log.warn("Can't connect to {}, trying next connection!", entry.getClient().getAddr());

@ -19,11 +19,17 @@ import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.protocol.RedisCommands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConnectionEntry {
private final Logger log = LoggerFactory.getLogger(getClass());
private volatile boolean freezed;
private final RedisClient client;
@ -55,4 +61,21 @@ public class ConnectionEntry {
return connections;
}
public RedisConnection connect(MasterSlaveServersConfig config) {
RedisConnection conn = client.connect();
if (config.getPassword() != null) {
conn.sync(RedisCommands.AUTH, config.getPassword());
}
if (config.getDatabase() != 0) {
conn.sync(RedisCommands.SELECT, config.getDatabase());
}
if (config.getClientName() != null) {
conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName());
}
log.debug("new connection created: {}", conn);
return conn;
}
}

@ -24,6 +24,7 @@ import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
@ -38,7 +39,7 @@ import org.slf4j.LoggerFactory;
//TODO ping support
public class MasterSlaveEntry {
private final Logger log = LoggerFactory.getLogger(getClass());
final Logger log = LoggerFactory.getLogger(getClass());
LoadBalancer slaveBalancer;
volatile ConnectionEntry masterEntry;
@ -124,20 +125,8 @@ public class MasterSlaveEntry {
}
try {
conn = masterEntry.getClient().connect();
if (config.getPassword() != null) {
conn.sync(RedisCommands.AUTH, config.getPassword());
}
if (config.getDatabase() != 0) {
conn.sync(RedisCommands.SELECT, config.getDatabase());
}
if (config.getClientName() != null) {
conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName());
}
log.debug("new connection created: {}", conn);
return conn;
} catch (RedisConnectionException e) {
return masterEntry.connect(config);
} catch (RedisException e) {
masterEntry.getConnectionsSemaphore().release();
throw e;
}

@ -22,13 +22,9 @@ import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SingleEntry extends MasterSlaveEntry {
private final Logger log = LoggerFactory.getLogger(getClass());
public SingleEntry(Codec codec, ConnectionManager connectionManager, MasterSlaveServersConfig config) {
super(codec, connectionManager, config);
}

Loading…
Cancel
Save