Long BlockingQueue\Deque commands reattaching. #449

pull/456/head
Nikita 9 years ago
parent 4f138fde01
commit 1a2ea4db43

@ -18,8 +18,10 @@ package org.redisson.client;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.handler.CommandsQueue;
import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData; import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.QueueCommand;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand; import org.redisson.client.protocol.RedisStrictCommand;
@ -59,6 +61,14 @@ public class RedisConnection implements RedisCommands {
return (C) channel.attr(RedisConnection.CONNECTION).get(); return (C) channel.attr(RedisConnection.CONNECTION).get();
} }
public CommandData getCurrentCommand() {
QueueCommand command = channel.attr(CommandsQueue.CURRENT_COMMAND).get();
if (command instanceof CommandData) {
return (CommandData)command;
}
return null;
}
public long getLastUsageTime() { public long getLastUsageTime() {
return lastUsageTime; return lastUsageTime;
} }

@ -22,6 +22,8 @@ import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException; import org.redisson.client.RedisException;
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.QueueCommand;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -121,18 +123,16 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
@Override @Override
public void operationComplete(Future<RedisConnection> future) throws Exception { public void operationComplete(Future<RedisConnection> future) throws Exception {
if (future.isSuccess()) { if (future.isSuccess()) {
connection.updateChannel(channel); refresh(connection, channel);
resubscribe(connection);
} }
} }
}); });
} else { } else {
connection.updateChannel(channel); refresh(connection, channel);
resubscribe(connection);
} }
} }
private void resubscribe(RedisConnection connection) { private void reattachPubSub(RedisConnection connection) {
if (connection instanceof RedisPubSubConnection) { if (connection instanceof RedisPubSubConnection) {
RedisPubSubConnection conn = (RedisPubSubConnection) connection; RedisPubSubConnection conn = (RedisPubSubConnection) connection;
for (Entry<String, Codec> entry : conn.getChannels().entrySet()) { for (Entry<String, Codec> entry : conn.getChannels().entrySet()) {
@ -149,4 +149,27 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
ctx.channel().close(); ctx.channel().close();
} }
private void refresh(RedisConnection connection, Channel channel) {
CommandData commandData = connection.getCurrentCommand();
connection.updateChannel(channel);
reattachBlockingQueue(connection, commandData);
reattachPubSub(connection);
}
private void reattachBlockingQueue(RedisConnection connection, final CommandData commandData) {
if (commandData != null
&& QueueCommand.TIMEOUTLESS_COMMANDS.contains(commandData.getCommand().getName())) {
ChannelFuture future = connection.send(commandData);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
log.error("Can't reconnect blocking queue to new connection {}", commandData);
}
}
});
}
}
} }

@ -24,6 +24,9 @@ public interface QueueCommand {
Set<String> PUBSUB_COMMANDS = new HashSet<String>(Arrays.asList("PSUBSCRIBE", "SUBSCRIBE", "PUNSUBSCRIBE", "UNSUBSCRIBE")); Set<String> PUBSUB_COMMANDS = new HashSet<String>(Arrays.asList("PSUBSCRIBE", "SUBSCRIBE", "PUNSUBSCRIBE", "UNSUBSCRIBE"));
Set<String> TIMEOUTLESS_COMMANDS = new HashSet<String>(Arrays.asList(RedisCommands.BLPOP_VALUE.getName(),
RedisCommands.BRPOP_VALUE.getName(), RedisCommands.BRPOPLPUSH.getName()));
List<CommandData<Object, Object>> getPubSubOperations(); List<CommandData<Object, Object>> getPubSubOperations();
} }

@ -20,7 +20,6 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -39,6 +38,7 @@ import org.redisson.client.WriteRedisConnectionException;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData; import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.QueueCommand;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.cluster.ClusterSlotRange; import org.redisson.cluster.ClusterSlotRange;
@ -68,9 +68,6 @@ public class CommandAsyncService implements CommandAsyncExecutor {
final ConnectionManager connectionManager; final ConnectionManager connectionManager;
private final Set<String> skipTimeout = new HashSet<String>(Arrays.asList(RedisCommands.BLPOP_VALUE.getName(),
RedisCommands.BRPOP_VALUE.getName(), RedisCommands.BRPOPLPUSH.getName()));
public CommandAsyncService(ConnectionManager connectionManager) { public CommandAsyncService(ConnectionManager connectionManager) {
this.connectionManager = connectionManager; this.connectionManager = connectionManager;
} }
@ -461,7 +458,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
details.getTimeout().cancel(); details.getTimeout().cancel();
int timeoutTime = connectionManager.getConfig().getTimeout(); int timeoutTime = connectionManager.getConfig().getTimeout();
if (skipTimeout.contains(details.getCommand().getName())) { if (QueueCommand.TIMEOUTLESS_COMMANDS.contains(details.getCommand().getName())) {
Integer popTimeout = Integer.valueOf(details.getParams()[details.getParams().length - 1].toString()); Integer popTimeout = Integer.valueOf(details.getParams()[details.getParams().length - 1].toString());
handleBlockingOperations(details, connection); handleBlockingOperations(details, connection);
if (popTimeout == 0) { if (popTimeout == 0) {

@ -1,7 +1,9 @@
package org.redisson; package org.redisson;
import static com.jayway.awaitility.Awaitility.await;
import static org.assertj.core.api.Assertions.*; import static org.assertj.core.api.Assertions.*;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
@ -11,18 +13,56 @@ import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.hamcrest.MatcherAssert; import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.redisson.RedisRunner.RedisProcess;
import org.redisson.core.RBlockingQueue; import org.redisson.core.RBlockingQueue;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
public class RedissonBlockingQueueTest extends BaseTest { public class RedissonBlockingQueueTest extends BaseTest {
@Test
public void testTakeReattach() throws InterruptedException, IOException {
RedisProcess runner = new RedisRunner().port(6319).run();
Config config = new Config();
config.useSingleServer().setAddress("127.0.0.1:6319");
RedissonClient redisson = Redisson.create(config);
final RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("queue:pollany");
Future<Integer> f = queue1.takeAsync();
final AtomicBoolean executed = new AtomicBoolean();
f.addListener(new FutureListener<Integer>() {
@Override
public void operationComplete(Future<Integer> future) throws Exception {
assertThat(future.get()).isEqualTo(123);
executed.set(true);
}
});
f.await(1, TimeUnit.SECONDS);
runner.stop();
runner = new RedisRunner().port(6319).run();
queue1.put(123);
// check connection rotation
for (int i = 0; i < 10; i++) {
queue1.put(i);
}
assertThat(queue1.size()).isEqualTo(10);
await().atMost(1, TimeUnit.SECONDS).until(() -> assertThat(executed.get()).isTrue());
runner.stop();
}
@Test @Test
public void testTakeAsyncCancel() { public void testTakeAsyncCancel() {
Config config = createConfig(); Config config = createConfig();

Loading…
Cancel
Save