RBlockingQueue.poll timeout handling fixed.

pull/297/head
Nikita 9 years ago
parent 853a0c2803
commit dce8df4938

@ -477,7 +477,7 @@ public class CommandExecutorService implements CommandExecutor {
}
};
exceptionRef.set(new RedisTimeoutException("Command execution timeout for command: " + command + " with params " + Arrays.toString(params)));
exceptionRef.set(new RedisTimeoutException("Command execution timeout for command: " + command + " with params: " + Arrays.toString(params)));
Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
timeoutRef.set(timeout);
@ -493,7 +493,7 @@ public class CommandExecutorService implements CommandExecutor {
return;
}
RedisConnection connection = connFuture.getNow();
final RedisConnection connection = connFuture.getNow();
if (source.getRedirect() == Redirect.ASK) {
List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(2);
@ -523,10 +523,17 @@ public class CommandExecutorService implements CommandExecutor {
TimerTask timeoutTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
attemptPromise.tryFailure(exceptionRef.get());
attemptPromise.tryFailure(
new RedisTimeoutException("Redis server response timeout for command: " + command
+ " with params: " + Arrays.toString(params) + " channel: " + connection.getChannel()));
}
};
Timeout timeout = connectionManager.newTimeout(timeoutTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);
int timeoutTime = connectionManager.getConfig().getTimeout();
if (command.getName().equals(RedisCommands.BLPOP_VALUE.getName())) {
timeoutTime += Integer.valueOf(params[params.length - 1].toString())*1000;
}
Timeout timeout = connectionManager.newTimeout(timeoutTask, timeoutTime, TimeUnit.MILLISECONDS);
timeoutRef.set(timeout);
}
}

@ -178,6 +178,15 @@ public class RedisConnection implements RedisCommands {
channel.close();
}
/**
* Access to Netty channel.
* This method is only provided to use in debug info.
*
*/
public Channel getChannel() {
return channel;
}
public ChannelFuture closeAsync() {
setClosed(true);
return channel.close();

@ -20,7 +20,10 @@ public class RedissonBlockingQueueTest extends BaseTest {
RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("queue1");
queue1.put(1);
Assert.assertEquals((Integer)1, queue1.poll(2, TimeUnit.SECONDS));
Assert.assertNull(queue1.poll(2, TimeUnit.SECONDS));
long s = System.currentTimeMillis();
Assert.assertNull(queue1.poll(5, TimeUnit.SECONDS));
Assert.assertTrue(System.currentTimeMillis() - s > 5000);
}
@Test
public void testAwait() throws InterruptedException {

Loading…
Cancel
Save