few fixes

pull/297/head
Nikita 9 years ago
parent 28532a6ec7
commit 2301b3f6ab

@ -523,10 +523,6 @@ public class CommandExecutorService implements CommandExecutor {
TimerTask timeoutTask = new TimerTask() { TimerTask timeoutTask = new TimerTask() {
@Override @Override
public void run(Timeout timeout) throws Exception { public void run(Timeout timeout) throws Exception {
if (attemptPromise.isDone()) {
return;
}
attemptPromise.tryFailure(exceptionRef.get()); attemptPromise.tryFailure(exceptionRef.get());
} }
}; };
@ -554,18 +550,18 @@ public class CommandExecutorService implements CommandExecutor {
if (future.cause() instanceof RedisMovedException) { if (future.cause() instanceof RedisMovedException) {
RedisMovedException ex = (RedisMovedException)future.cause(); RedisMovedException ex = (RedisMovedException)future.cause();
async(readOnlyMode, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), messageDecoder, codec, command, params, mainPromise, 0); async(readOnlyMode, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), messageDecoder, codec, command, params, mainPromise, attempt);
return; return;
} }
if (future.cause() instanceof RedisAskException) { if (future.cause() instanceof RedisAskException) {
RedisAskException ex = (RedisAskException)future.cause(); RedisAskException ex = (RedisAskException)future.cause();
async(readOnlyMode, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), messageDecoder, codec, command, params, mainPromise, 0); async(readOnlyMode, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), messageDecoder, codec, command, params, mainPromise, attempt);
return; return;
} }
if (future.cause() instanceof RedisLoadingException) { if (future.cause() instanceof RedisLoadingException) {
async(readOnlyMode, source, messageDecoder, codec, command, params, mainPromise, 0); async(readOnlyMode, source, messageDecoder, codec, command, params, mainPromise, attempt);
return; return;
} }

@ -118,7 +118,7 @@ abstract class BaseLoadBalancer implements LoadBalancer {
// close all pub/sub connections // close all pub/sub connections
while (true) { while (true) {
RedisPubSubConnection connection = connectionEntry.pollFreeSubscribeConnection(); RedisPubSubConnection connection = connectionEntry.pollSubscribeConnection();
if (connection == null) { if (connection == null) {
break; break;
} }

@ -41,7 +41,7 @@ public class SubscribesConnectionEntry extends ConnectionEntry {
return allSubscribeConnections; return allSubscribeConnections;
} }
public RedisPubSubConnection pollFreeSubscribeConnection() { public RedisPubSubConnection pollSubscribeConnection() {
return freeSubscribeConnections.poll(); return freeSubscribeConnections.poll();
} }

@ -282,7 +282,12 @@ public class ConnectionPool<T extends RedisConnection> {
return; return;
} }
final RedisConnection c = future.getNow(); final RedisConnection c = future.getNow();
if (c.isActive()) { if (!c.isActive()) {
c.closeAsync();
scheduleCheck(entry);
return;
}
Future<String> f = c.asyncWithTimeout(null, RedisCommands.PING); Future<String> f = c.asyncWithTimeout(null, RedisCommands.PING);
f.addListener(new FutureListener<String>() { f.addListener(new FutureListener<String>() {
@Override @Override
@ -322,10 +327,6 @@ public class ConnectionPool<T extends RedisConnection> {
} }
} }
}); });
} else {
c.closeAsync();
scheduleCheck(entry);
}
} }
}); });
} }

@ -33,7 +33,7 @@ public class PubSubConnectionPoll extends ConnectionPool<RedisPubSubConnection>
@Override @Override
protected RedisPubSubConnection poll(SubscribesConnectionEntry entry) { protected RedisPubSubConnection poll(SubscribesConnectionEntry entry) {
return entry.pollFreeSubscribeConnection(); return entry.pollSubscribeConnection();
} }
@Override @Override

Loading…
Cancel
Save