From 1a2ea4db43f5e7d3075aa02621894c940f5a8a6a Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 25 Mar 2016 15:33:34 +0300 Subject: [PATCH] Long BlockingQueue\Deque commands reattaching. #449 --- .../org/redisson/client/RedisConnection.java | 10 +++++ .../client/handler/ConnectionWatchdog.java | 33 ++++++++++++--- .../client/protocol/QueueCommand.java | 3 ++ .../redisson/command/CommandAsyncService.java | 7 +--- .../redisson/RedissonBlockingQueueTest.java | 40 +++++++++++++++++++ 5 files changed, 83 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/redisson/client/RedisConnection.java b/src/main/java/org/redisson/client/RedisConnection.java index 53f1c9383..9db33c5c0 100644 --- a/src/main/java/org/redisson/client/RedisConnection.java +++ b/src/main/java/org/redisson/client/RedisConnection.java @@ -18,8 +18,10 @@ package org.redisson.client; import java.util.concurrent.TimeUnit; import org.redisson.client.codec.Codec; +import org.redisson.client.handler.CommandsQueue; import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.CommandsData; +import org.redisson.client.protocol.QueueCommand; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisStrictCommand; @@ -59,6 +61,14 @@ public class RedisConnection implements RedisCommands { return (C) channel.attr(RedisConnection.CONNECTION).get(); } + public CommandData getCurrentCommand() { + QueueCommand command = channel.attr(CommandsQueue.CURRENT_COMMAND).get(); + if (command instanceof CommandData) { + return (CommandData)command; + } + return null; + } + public long getLastUsageTime() { return lastUsageTime; } diff --git a/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java b/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java index 81a030ce6..1afce25e6 100644 --- a/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java +++ b/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java @@ -22,6 +22,8 @@ import org.redisson.client.RedisConnection; import org.redisson.client.RedisException; import org.redisson.client.RedisPubSubConnection; import org.redisson.client.codec.Codec; +import org.redisson.client.protocol.CommandData; +import org.redisson.client.protocol.QueueCommand; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -121,18 +123,16 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { @Override public void operationComplete(Future future) throws Exception { if (future.isSuccess()) { - connection.updateChannel(channel); - resubscribe(connection); + refresh(connection, channel); } } }); } else { - connection.updateChannel(channel); - resubscribe(connection); + refresh(connection, channel); } } - private void resubscribe(RedisConnection connection) { + private void reattachPubSub(RedisConnection connection) { if (connection instanceof RedisPubSubConnection) { RedisPubSubConnection conn = (RedisPubSubConnection) connection; for (Entry entry : conn.getChannels().entrySet()) { @@ -149,4 +149,27 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { ctx.channel().close(); } + private void refresh(RedisConnection connection, Channel channel) { + CommandData commandData = connection.getCurrentCommand(); + connection.updateChannel(channel); + + reattachBlockingQueue(connection, commandData); + reattachPubSub(connection); + } + + private void reattachBlockingQueue(RedisConnection connection, final CommandData commandData) { + if (commandData != null + && QueueCommand.TIMEOUTLESS_COMMANDS.contains(commandData.getCommand().getName())) { + ChannelFuture future = connection.send(commandData); + future.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + log.error("Can't reconnect blocking queue to new connection {}", commandData); + } + } + }); + } + } + } diff --git a/src/main/java/org/redisson/client/protocol/QueueCommand.java b/src/main/java/org/redisson/client/protocol/QueueCommand.java index 028810a34..33582c982 100644 --- a/src/main/java/org/redisson/client/protocol/QueueCommand.java +++ b/src/main/java/org/redisson/client/protocol/QueueCommand.java @@ -23,6 +23,9 @@ import java.util.Set; public interface QueueCommand { Set PUBSUB_COMMANDS = new HashSet(Arrays.asList("PSUBSCRIBE", "SUBSCRIBE", "PUNSUBSCRIBE", "UNSUBSCRIBE")); + + Set TIMEOUTLESS_COMMANDS = new HashSet(Arrays.asList(RedisCommands.BLPOP_VALUE.getName(), + RedisCommands.BRPOP_VALUE.getName(), RedisCommands.BRPOPLPUSH.getName())); List> getPubSubOperations(); diff --git a/src/main/java/org/redisson/command/CommandAsyncService.java b/src/main/java/org/redisson/command/CommandAsyncService.java index b6c753077..1abdce13d 100644 --- a/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/src/main/java/org/redisson/command/CommandAsyncService.java @@ -20,7 +20,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -39,6 +38,7 @@ import org.redisson.client.WriteRedisConnectionException; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.CommandsData; +import org.redisson.client.protocol.QueueCommand; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; import org.redisson.cluster.ClusterSlotRange; @@ -68,9 +68,6 @@ public class CommandAsyncService implements CommandAsyncExecutor { final ConnectionManager connectionManager; - private final Set skipTimeout = new HashSet(Arrays.asList(RedisCommands.BLPOP_VALUE.getName(), - RedisCommands.BRPOP_VALUE.getName(), RedisCommands.BRPOPLPUSH.getName())); - public CommandAsyncService(ConnectionManager connectionManager) { this.connectionManager = connectionManager; } @@ -461,7 +458,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { details.getTimeout().cancel(); int timeoutTime = connectionManager.getConfig().getTimeout(); - if (skipTimeout.contains(details.getCommand().getName())) { + if (QueueCommand.TIMEOUTLESS_COMMANDS.contains(details.getCommand().getName())) { Integer popTimeout = Integer.valueOf(details.getParams()[details.getParams().length - 1].toString()); handleBlockingOperations(details, connection); if (popTimeout == 0) { diff --git a/src/test/java/org/redisson/RedissonBlockingQueueTest.java b/src/test/java/org/redisson/RedissonBlockingQueueTest.java index 5fab2294d..e4c5938be 100644 --- a/src/test/java/org/redisson/RedissonBlockingQueueTest.java +++ b/src/test/java/org/redisson/RedissonBlockingQueueTest.java @@ -1,7 +1,9 @@ package org.redisson; +import static com.jayway.awaitility.Awaitility.await; import static org.assertj.core.api.Assertions.*; +import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import java.util.LinkedList; @@ -11,18 +13,56 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; +import org.redisson.RedisRunner.RedisProcess; import org.redisson.core.RBlockingQueue; import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; public class RedissonBlockingQueueTest extends BaseTest { + @Test + public void testTakeReattach() throws InterruptedException, IOException { + RedisProcess runner = new RedisRunner().port(6319).run(); + + Config config = new Config(); + config.useSingleServer().setAddress("127.0.0.1:6319"); + RedissonClient redisson = Redisson.create(config); + final RBlockingQueue queue1 = redisson.getBlockingQueue("queue:pollany"); + Future f = queue1.takeAsync(); + + final AtomicBoolean executed = new AtomicBoolean(); + f.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + assertThat(future.get()).isEqualTo(123); + executed.set(true); + } + }); + + f.await(1, TimeUnit.SECONDS); + runner.stop(); + + runner = new RedisRunner().port(6319).run(); + queue1.put(123); + + // check connection rotation + for (int i = 0; i < 10; i++) { + queue1.put(i); + } + assertThat(queue1.size()).isEqualTo(10); + + await().atMost(1, TimeUnit.SECONDS).until(() -> assertThat(executed.get()).isTrue()); + runner.stop(); + } + @Test public void testTakeAsyncCancel() { Config config = createConfig();