pull/261/head
lefay 9 years ago
commit 3c932ff263

@ -126,7 +126,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
if (i == commands.getCommands().size()) {
Promise<Void> promise = commands.getPromise();
if (!promise.trySuccess(null) && promise.isCancelled()) {
if (!promise.trySuccess(null)) {
log.warn("response has been skipped due to timeout! channel: {}, command: {}", ctx.channel(), data);
}
@ -214,7 +214,9 @@ public class CommandDecoder extends ReplayingDecoder<State> {
private void handleMultiResult(CommandData<Object, Object> data, List<Object> parts,
Channel channel, Object result) {
if (data == null) {
if (data != null) {
handleResult(data, parts, result, true, channel);
} else {
if (result instanceof PubSubStatusMessage) {
String channelName = ((PubSubStatusMessage) result).getChannel();
CommandData<Object, Object> d = channels.get(channelName);
@ -227,11 +229,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
messageDecoders.remove(channelName);
}
}
}
if (data != null) {
handleResult(data, parts, result, true, channel);
} else {
RedisPubSubConnection pubSubConnection = (RedisPubSubConnection)channel.attr(RedisPubSubConnection.CONNECTION).get();
if (result instanceof PubSubStatusMessage) {
pubSubConnection.onMessage((PubSubStatusMessage) result);
@ -254,7 +252,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
if (parts != null) {
parts.add(result);
} else {
if (!data.getPromise().trySuccess(result) && data.getPromise().isCancelled()) {
if (!data.getPromise().trySuccess(result)) {
log.warn("response has been skipped due to timeout! channel: {}, command: {}, result: {}", channel, data, result);
}
}
@ -264,7 +262,11 @@ public class CommandDecoder extends ReplayingDecoder<State> {
if (data == null) {
if (Arrays.asList("subscribe", "psubscribe", "punsubscribe", "unsubscribe").contains(parts.get(0))) {
String channelName = (String) parts.get(1);
return channels.get(channelName).getCommand().getReplayMultiDecoder();
CommandData<Object, Object> commandData = channels.get(channelName);
if (commandData == null) {
throw new IllegalStateException("Can't find CommandData for command: " + parts);
}
return commandData.getCommand().getReplayMultiDecoder();
} else if (parts.get(0).equals("message")) {
String channelName = (String) parts.get(1);
return messageDecoders.get(channelName);

@ -6,6 +6,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
public abstract class BaseConcurrentTest extends BaseTest {
protected void testMultiInstanceConcurrency(int iterations, final RedissonRunnable runnable) throws InterruptedException {
@ -29,7 +31,7 @@ public abstract class BaseConcurrentTest extends BaseTest {
}
executor.shutdown();
executor.awaitTermination(5, TimeUnit.MINUTES);
Assert.assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES));
System.out.println("multi: " + (System.currentTimeMillis() - watch));
@ -45,7 +47,7 @@ public abstract class BaseConcurrentTest extends BaseTest {
}
executor.shutdown();
executor.awaitTermination(5, TimeUnit.MINUTES);
Assert.assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES));
}
protected void testSingleInstanceConcurrency(int iterations, final RedissonRunnable runnable) throws InterruptedException {
@ -63,7 +65,7 @@ public abstract class BaseConcurrentTest extends BaseTest {
}
executor.shutdown();
executor.awaitTermination(5, TimeUnit.MINUTES);
Assert.assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES));
System.out.println(System.currentTimeMillis() - watch);

@ -90,7 +90,7 @@ public class RedisClientTest {
}
pool.shutdown();
pool.awaitTermination(1, TimeUnit.HOURS);
Assert.assertTrue(pool.awaitTermination(1, TimeUnit.HOURS));
Assert.assertEquals(100000L, conn.sync(LongCodec.INSTANCE, RedisCommands.GET, "test"));

@ -48,7 +48,7 @@ public class RedissonCountDownLatchConcurrentTest {
}
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
Assert.assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS));
redisson.shutdown();
}

@ -44,7 +44,7 @@ public class RedissonCountDownLatchTest extends BaseTest {
});
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
Assert.assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS));
}
@ -82,7 +82,7 @@ public class RedissonCountDownLatchTest extends BaseTest {
});
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
Assert.assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS));
}
@Test

@ -1,16 +1,13 @@
package org.redisson;
import java.io.Serializable;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.core.BaseStatusListener;
import org.redisson.core.MessageListener;
import org.redisson.core.RTopic;
import org.redisson.core.StatusListener;
public class RedissonTopicTest {

Loading…
Cancel
Save