From 2f5f8e6fdfe822dd6dfaf43bc61627d3b6be4a0d Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 15 Oct 2015 15:55:34 +0300 Subject: [PATCH 1/2] minor improvements --- .../client/handler/CommandDecoder.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/redisson/client/handler/CommandDecoder.java b/src/main/java/org/redisson/client/handler/CommandDecoder.java index 5398693fb..268ce8418 100644 --- a/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -126,7 +126,7 @@ public class CommandDecoder extends ReplayingDecoder { if (i == commands.getCommands().size()) { Promise promise = commands.getPromise(); - if (!promise.trySuccess(null) && promise.isCancelled()) { + if (!promise.trySuccess(null)) { log.warn("response has been skipped due to timeout! channel: {}, command: {}", ctx.channel(), data); } @@ -214,7 +214,9 @@ public class CommandDecoder extends ReplayingDecoder { private void handleMultiResult(CommandData data, List parts, Channel channel, Object result) { - if (data == null) { + if (data != null) { + handleResult(data, parts, result, true, channel); + } else { if (result instanceof PubSubStatusMessage) { String channelName = ((PubSubStatusMessage) result).getChannel(); CommandData d = channels.get(channelName); @@ -227,11 +229,7 @@ public class CommandDecoder extends ReplayingDecoder { messageDecoders.remove(channelName); } } - } - if (data != null) { - handleResult(data, parts, result, true, channel); - } else { RedisPubSubConnection pubSubConnection = (RedisPubSubConnection)channel.attr(RedisPubSubConnection.CONNECTION).get(); if (result instanceof PubSubStatusMessage) { pubSubConnection.onMessage((PubSubStatusMessage) result); @@ -254,7 +252,7 @@ public class CommandDecoder extends ReplayingDecoder { if (parts != null) { parts.add(result); } else { - if (!data.getPromise().trySuccess(result) && data.getPromise().isCancelled()) { + if (!data.getPromise().trySuccess(result)) { log.warn("response has been skipped due to timeout! channel: {}, command: {}, result: {}", channel, data, result); } } @@ -264,7 +262,11 @@ public class CommandDecoder extends ReplayingDecoder { if (data == null) { if (Arrays.asList("subscribe", "psubscribe", "punsubscribe", "unsubscribe").contains(parts.get(0))) { String channelName = (String) parts.get(1); - return channels.get(channelName).getCommand().getReplayMultiDecoder(); + CommandData commandData = channels.get(channelName); + if (commandData == null) { + throw new IllegalStateException("Can't find CommandData for command: " + parts); + } + return commandData.getCommand().getReplayMultiDecoder(); } else if (parts.get(0).equals("message")) { String channelName = (String) parts.get(1); return messageDecoders.get(channelName); From dab99e363a96b3b97690723b16c27f642b90090d Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 15 Oct 2015 15:56:06 +0300 Subject: [PATCH 2/2] additional assertions --- src/test/java/org/redisson/BaseConcurrentTest.java | 8 +++++--- src/test/java/org/redisson/RedisClientTest.java | 2 +- .../redisson/RedissonCountDownLatchConcurrentTest.java | 2 +- .../java/org/redisson/RedissonCountDownLatchTest.java | 4 ++-- src/test/java/org/redisson/RedissonTopicTest.java | 3 --- 5 files changed, 9 insertions(+), 10 deletions(-) diff --git a/src/test/java/org/redisson/BaseConcurrentTest.java b/src/test/java/org/redisson/BaseConcurrentTest.java index af2140ca8..1d413363c 100644 --- a/src/test/java/org/redisson/BaseConcurrentTest.java +++ b/src/test/java/org/redisson/BaseConcurrentTest.java @@ -6,6 +6,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.junit.Assert; + public abstract class BaseConcurrentTest extends BaseTest { protected void testMultiInstanceConcurrency(int iterations, final RedissonRunnable runnable) throws InterruptedException { @@ -29,7 +31,7 @@ public abstract class BaseConcurrentTest extends BaseTest { } executor.shutdown(); - executor.awaitTermination(5, TimeUnit.MINUTES); + Assert.assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES)); System.out.println("multi: " + (System.currentTimeMillis() - watch)); @@ -45,7 +47,7 @@ public abstract class BaseConcurrentTest extends BaseTest { } executor.shutdown(); - executor.awaitTermination(5, TimeUnit.MINUTES); + Assert.assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES)); } protected void testSingleInstanceConcurrency(int iterations, final RedissonRunnable runnable) throws InterruptedException { @@ -63,7 +65,7 @@ public abstract class BaseConcurrentTest extends BaseTest { } executor.shutdown(); - executor.awaitTermination(5, TimeUnit.MINUTES); + Assert.assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES)); System.out.println(System.currentTimeMillis() - watch); diff --git a/src/test/java/org/redisson/RedisClientTest.java b/src/test/java/org/redisson/RedisClientTest.java index 62d62d799..702ee84c1 100644 --- a/src/test/java/org/redisson/RedisClientTest.java +++ b/src/test/java/org/redisson/RedisClientTest.java @@ -90,7 +90,7 @@ public class RedisClientTest { } pool.shutdown(); - pool.awaitTermination(1, TimeUnit.HOURS); + Assert.assertTrue(pool.awaitTermination(1, TimeUnit.HOURS)); Assert.assertEquals(100000L, conn.sync(LongCodec.INSTANCE, RedisCommands.GET, "test")); diff --git a/src/test/java/org/redisson/RedissonCountDownLatchConcurrentTest.java b/src/test/java/org/redisson/RedissonCountDownLatchConcurrentTest.java index f319f7f24..417a57bc3 100644 --- a/src/test/java/org/redisson/RedissonCountDownLatchConcurrentTest.java +++ b/src/test/java/org/redisson/RedissonCountDownLatchConcurrentTest.java @@ -48,7 +48,7 @@ public class RedissonCountDownLatchConcurrentTest { } executor.shutdown(); - executor.awaitTermination(10, TimeUnit.SECONDS); + Assert.assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS)); redisson.shutdown(); } diff --git a/src/test/java/org/redisson/RedissonCountDownLatchTest.java b/src/test/java/org/redisson/RedissonCountDownLatchTest.java index a76e910c2..6e2e76789 100644 --- a/src/test/java/org/redisson/RedissonCountDownLatchTest.java +++ b/src/test/java/org/redisson/RedissonCountDownLatchTest.java @@ -44,7 +44,7 @@ public class RedissonCountDownLatchTest extends BaseTest { }); executor.shutdown(); - executor.awaitTermination(10, TimeUnit.SECONDS); + Assert.assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS)); } @@ -82,7 +82,7 @@ public class RedissonCountDownLatchTest extends BaseTest { }); executor.shutdown(); - executor.awaitTermination(10, TimeUnit.SECONDS); + Assert.assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS)); } @Test diff --git a/src/test/java/org/redisson/RedissonTopicTest.java b/src/test/java/org/redisson/RedissonTopicTest.java index 4d51b7f8f..23158db91 100644 --- a/src/test/java/org/redisson/RedissonTopicTest.java +++ b/src/test/java/org/redisson/RedissonTopicTest.java @@ -1,16 +1,13 @@ package org.redisson; import java.io.Serializable; -import java.util.Iterator; import java.util.concurrent.CountDownLatch; import org.junit.Assert; import org.junit.Test; -import org.redisson.client.protocol.pubsub.PubSubType; import org.redisson.core.BaseStatusListener; import org.redisson.core.MessageListener; import org.redisson.core.RTopic; -import org.redisson.core.StatusListener; public class RedissonTopicTest {