From 979d11ae0e6982c0cdd4100d01435bd5c2ae3919 Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 21 Jun 2018 15:07:45 +0300 Subject: [PATCH] Fixed - Missing PubSub messages when pingConnectionInterval setting is specified. #1497 --- .../org/redisson/client/RedisConnection.java | 2 +- .../client/handler/CommandPubSubDecoder.java | 4 ++- .../client/handler/PingConnectionHandler.java | 1 + .../client/protocol/RedisCommands.java | 8 ++--- ...ectDecoder.java => ListObjectDecoder.java} | 13 +++++++-- .../org/redisson/misc/RedissonPromise.java | 7 ++++- .../java/org/redisson/RedissonTopicTest.java | 29 +++++++++++++++++++ 7 files changed, 54 insertions(+), 10 deletions(-) rename redisson/src/main/java/org/redisson/client/protocol/decoder/{QueueObjectDecoder.java => ListObjectDecoder.java} (80%) diff --git a/redisson/src/main/java/org/redisson/client/RedisConnection.java b/redisson/src/main/java/org/redisson/client/RedisConnection.java index 9b28610c4..4e622c5c7 100644 --- a/redisson/src/main/java/org/redisson/client/RedisConnection.java +++ b/redisson/src/main/java/org/redisson/client/RedisConnection.java @@ -165,7 +165,7 @@ public class RedisConnection implements RedisCommands { } } - public T sync(RedisStrictCommand command, Object ... params) { + public T sync(RedisCommand command, Object ... params) { return sync(null, command, params); } diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java index ce41802c6..c69342fa6 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java @@ -18,8 +18,10 @@ package org.redisson.client.handler; import java.io.IOException; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutorService; import org.redisson.client.RedisPubSubConnection; @@ -43,7 +45,7 @@ import io.netty.util.internal.PlatformDependent; */ public class CommandPubSubDecoder extends CommandDecoder { - private static final List MESSAGES = Arrays.asList("subscribe", "psubscribe", "punsubscribe", "unsubscribe"); + private static final Set MESSAGES = new HashSet(Arrays.asList("subscribe", "psubscribe", "punsubscribe", "unsubscribe")); // It is not needed to use concurrent map because responses are coming consecutive private final Map entries = new HashMap(); private final Map> commands = PlatformDependent.newConcurrentHashMap(); diff --git a/redisson/src/main/java/org/redisson/client/handler/PingConnectionHandler.java b/redisson/src/main/java/org/redisson/client/handler/PingConnectionHandler.java index 64674248d..1132d5bf9 100644 --- a/redisson/src/main/java/org/redisson/client/handler/PingConnectionHandler.java +++ b/redisson/src/main/java/org/redisson/client/handler/PingConnectionHandler.java @@ -57,6 +57,7 @@ public class PingConnectionHandler extends ChannelInboundHandlerAdapter { @Override public void run(Timeout timeout) throws Exception { if (future.cancel(false) || !future.isSuccess()) { + System.out.println("closed!!! " + future + " " + connection.getChannel()); ctx.channel().close(); } else { sendPing(ctx); diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index e7e0a86e6..bd88eb949 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -57,7 +57,7 @@ import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapJoinDecoder; import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder; import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder; -import org.redisson.client.protocol.decoder.QueueObjectDecoder; +import org.redisson.client.protocol.decoder.ListObjectDecoder; import org.redisson.client.protocol.decoder.ScoredSortedSetPolledObjectDecoder; import org.redisson.client.protocol.decoder.ScoredSortedSetReplayDecoder; import org.redisson.client.protocol.decoder.ScoredSortedSetScanDecoder; @@ -143,7 +143,7 @@ public interface RedisCommands { RedisCommand> SCAN = new RedisCommand>("SCAN", new ListMultiDecoder(new LongMultiDecoder(), new ObjectListReplayDecoder(), new ListScanResultReplayDecoder())); RedisStrictCommand RANDOM_KEY = new RedisStrictCommand("RANDOMKEY"); - RedisStrictCommand PING = new RedisStrictCommand("PING"); + RedisCommand PING = new RedisCommand("PING", new ListObjectDecoder(0)); RedisStrictCommand PING_BOOL = new RedisStrictCommand("PING", new BooleanNotNullReplayConvertor()); RedisStrictCommand UNWATCH = new RedisStrictCommand("UNWATCH", new VoidReplayConvertor()); @@ -192,8 +192,8 @@ public interface RedisCommands { RedisCommand RPOPLPUSH = new RedisCommand("RPOPLPUSH"); RedisCommand BRPOPLPUSH = new RedisCommand("BRPOPLPUSH"); - RedisCommand BLPOP_VALUE = new RedisCommand("BLPOP", new QueueObjectDecoder()); - RedisCommand BRPOP_VALUE = new RedisCommand("BRPOP", new QueueObjectDecoder()); + RedisCommand BLPOP_VALUE = new RedisCommand("BLPOP", new ListObjectDecoder(1)); + RedisCommand BRPOP_VALUE = new RedisCommand("BRPOP", new ListObjectDecoder(1)); RedisCommand BZPOPMIN_VALUE = new RedisCommand("BZPOPMIN", new ScoredSortedSetPolledObjectDecoder()); RedisCommand BZPOPMAX_VALUE = new RedisCommand("BZPOPMAX", new ScoredSortedSetPolledObjectDecoder()); diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/QueueObjectDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/ListObjectDecoder.java similarity index 80% rename from redisson/src/main/java/org/redisson/client/protocol/decoder/QueueObjectDecoder.java rename to redisson/src/main/java/org/redisson/client/protocol/decoder/ListObjectDecoder.java index 0757319d3..410cd42f2 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/QueueObjectDecoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/ListObjectDecoder.java @@ -26,7 +26,14 @@ import org.redisson.client.protocol.Decoder; * @author Nikita Koksharov * */ -public class QueueObjectDecoder implements MultiDecoder { +public class ListObjectDecoder implements MultiDecoder { + + private int index; + + public ListObjectDecoder(int index) { + super(); + this.index = index; + } @Override public Decoder getDecoder(int paramNum, State state) { @@ -37,11 +44,11 @@ public class QueueObjectDecoder implements MultiDecoder { } @Override - public Object decode(List parts, State state) { + public T decode(List parts, State state) { if (parts.isEmpty()) { return null; } - return parts.get(1); + return (T) parts.get(index); } } diff --git a/redisson/src/main/java/org/redisson/misc/RedissonPromise.java b/redisson/src/main/java/org/redisson/misc/RedissonPromise.java index b925e5028..f365b30ef 100644 --- a/redisson/src/main/java/org/redisson/misc/RedissonPromise.java +++ b/redisson/src/main/java/org/redisson/misc/RedissonPromise.java @@ -194,5 +194,10 @@ public class RedissonPromise implements RPromise { throw new IllegalStateException(e); } } - + + @Override + public String toString() { + return "RedissonPromise [promise=" + promise + "]"; + } + } diff --git a/redisson/src/test/java/org/redisson/RedissonTopicTest.java b/redisson/src/test/java/org/redisson/RedissonTopicTest.java index 501067601..21d1161b3 100644 --- a/redisson/src/test/java/org/redisson/RedissonTopicTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTopicTest.java @@ -7,7 +7,11 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -104,6 +108,31 @@ public class RedissonTopicTest { } + @Test + public void testPing() throws InterruptedException { + Config config = BaseTest.createConfig(); + config.useSingleServer().setPingConnectionInterval(50); + RedissonClient redisson = Redisson.create(config); + + Set sentItems = new HashSet<>(); + Set receivedItems = new HashSet<>(); + + RTopic eventsTopic = redisson.getTopic("eventsTopic"); + eventsTopic.addListener((channel, msg) -> receivedItems.add(msg)); + + for(int i = 0; i<1000; i++){ + final String message = UUID.randomUUID().toString(); + eventsTopic.publish(message); + sentItems.add(message); + Thread.sleep(10); + } + + Thread.sleep(2000); + + assertThat(sentItems).hasSameSizeAs(receivedItems); + redisson.shutdown(); + } + @Test public void testConcurrentTopic() throws Exception { RedissonClient redisson = BaseTest.createInstance();