Fixed - Missing PubSub messages when pingConnectionInterval setting is specified. #1497

pull/1506/head
Nikita 7 years ago
parent 1d7252555a
commit 979d11ae0e

@ -165,7 +165,7 @@ public class RedisConnection implements RedisCommands {
} }
} }
public <T> T sync(RedisStrictCommand<T> command, Object ... params) { public <T> T sync(RedisCommand<T> command, Object ... params) {
return sync(null, command, params); return sync(null, command, params);
} }

@ -18,8 +18,10 @@ package org.redisson.client.handler;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
@ -43,7 +45,7 @@ import io.netty.util.internal.PlatformDependent;
*/ */
public class CommandPubSubDecoder extends CommandDecoder { public class CommandPubSubDecoder extends CommandDecoder {
private static final List<String> MESSAGES = Arrays.asList("subscribe", "psubscribe", "punsubscribe", "unsubscribe"); private static final Set<String> MESSAGES = new HashSet<String>(Arrays.asList("subscribe", "psubscribe", "punsubscribe", "unsubscribe"));
// It is not needed to use concurrent map because responses are coming consecutive // It is not needed to use concurrent map because responses are coming consecutive
private final Map<String, PubSubEntry> entries = new HashMap<String, PubSubEntry>(); private final Map<String, PubSubEntry> entries = new HashMap<String, PubSubEntry>();
private final Map<PubSubKey, CommandData<Object, Object>> commands = PlatformDependent.newConcurrentHashMap(); private final Map<PubSubKey, CommandData<Object, Object>> commands = PlatformDependent.newConcurrentHashMap();

@ -57,6 +57,7 @@ public class PingConnectionHandler extends ChannelInboundHandlerAdapter {
@Override @Override
public void run(Timeout timeout) throws Exception { public void run(Timeout timeout) throws Exception {
if (future.cancel(false) || !future.isSuccess()) { if (future.cancel(false) || !future.isSuccess()) {
System.out.println("closed!!! " + future + " " + connection.getChannel());
ctx.channel().close(); ctx.channel().close();
} else { } else {
sendPing(ctx); sendPing(ctx);

@ -57,7 +57,7 @@ import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapJoinDecoder; import org.redisson.client.protocol.decoder.ObjectMapJoinDecoder;
import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder; import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder;
import org.redisson.client.protocol.decoder.QueueObjectDecoder; import org.redisson.client.protocol.decoder.ListObjectDecoder;
import org.redisson.client.protocol.decoder.ScoredSortedSetPolledObjectDecoder; import org.redisson.client.protocol.decoder.ScoredSortedSetPolledObjectDecoder;
import org.redisson.client.protocol.decoder.ScoredSortedSetReplayDecoder; import org.redisson.client.protocol.decoder.ScoredSortedSetReplayDecoder;
import org.redisson.client.protocol.decoder.ScoredSortedSetScanDecoder; import org.redisson.client.protocol.decoder.ScoredSortedSetScanDecoder;
@ -143,7 +143,7 @@ public interface RedisCommands {
RedisCommand<ListScanResult<String>> SCAN = new RedisCommand<ListScanResult<String>>("SCAN", new ListMultiDecoder(new LongMultiDecoder(), new ObjectListReplayDecoder<String>(), new ListScanResultReplayDecoder())); RedisCommand<ListScanResult<String>> SCAN = new RedisCommand<ListScanResult<String>>("SCAN", new ListMultiDecoder(new LongMultiDecoder(), new ObjectListReplayDecoder<String>(), new ListScanResultReplayDecoder()));
RedisStrictCommand<String> RANDOM_KEY = new RedisStrictCommand<String>("RANDOMKEY"); RedisStrictCommand<String> RANDOM_KEY = new RedisStrictCommand<String>("RANDOMKEY");
RedisStrictCommand<String> PING = new RedisStrictCommand<String>("PING"); RedisCommand<String> PING = new RedisCommand<String>("PING", new ListObjectDecoder<String>(0));
RedisStrictCommand<Boolean> PING_BOOL = new RedisStrictCommand<Boolean>("PING", new BooleanNotNullReplayConvertor()); RedisStrictCommand<Boolean> PING_BOOL = new RedisStrictCommand<Boolean>("PING", new BooleanNotNullReplayConvertor());
RedisStrictCommand<Void> UNWATCH = new RedisStrictCommand<Void>("UNWATCH", new VoidReplayConvertor()); RedisStrictCommand<Void> UNWATCH = new RedisStrictCommand<Void>("UNWATCH", new VoidReplayConvertor());
@ -192,8 +192,8 @@ public interface RedisCommands {
RedisCommand<Object> RPOPLPUSH = new RedisCommand<Object>("RPOPLPUSH"); RedisCommand<Object> RPOPLPUSH = new RedisCommand<Object>("RPOPLPUSH");
RedisCommand<Object> BRPOPLPUSH = new RedisCommand<Object>("BRPOPLPUSH"); RedisCommand<Object> BRPOPLPUSH = new RedisCommand<Object>("BRPOPLPUSH");
RedisCommand<Object> BLPOP_VALUE = new RedisCommand<Object>("BLPOP", new QueueObjectDecoder()); RedisCommand<Object> BLPOP_VALUE = new RedisCommand<Object>("BLPOP", new ListObjectDecoder<Object>(1));
RedisCommand<Object> BRPOP_VALUE = new RedisCommand<Object>("BRPOP", new QueueObjectDecoder()); RedisCommand<Object> BRPOP_VALUE = new RedisCommand<Object>("BRPOP", new ListObjectDecoder<Object>(1));
RedisCommand<Object> BZPOPMIN_VALUE = new RedisCommand<Object>("BZPOPMIN", new ScoredSortedSetPolledObjectDecoder()); RedisCommand<Object> BZPOPMIN_VALUE = new RedisCommand<Object>("BZPOPMIN", new ScoredSortedSetPolledObjectDecoder());
RedisCommand<Object> BZPOPMAX_VALUE = new RedisCommand<Object>("BZPOPMAX", new ScoredSortedSetPolledObjectDecoder()); RedisCommand<Object> BZPOPMAX_VALUE = new RedisCommand<Object>("BZPOPMAX", new ScoredSortedSetPolledObjectDecoder());

@ -26,7 +26,14 @@ import org.redisson.client.protocol.Decoder;
* @author Nikita Koksharov * @author Nikita Koksharov
* *
*/ */
public class QueueObjectDecoder implements MultiDecoder<Object> { public class ListObjectDecoder<T> implements MultiDecoder<T> {
private int index;
public ListObjectDecoder(int index) {
super();
this.index = index;
}
@Override @Override
public Decoder<Object> getDecoder(int paramNum, State state) { public Decoder<Object> getDecoder(int paramNum, State state) {
@ -37,11 +44,11 @@ public class QueueObjectDecoder implements MultiDecoder<Object> {
} }
@Override @Override
public Object decode(List<Object> parts, State state) { public T decode(List<Object> parts, State state) {
if (parts.isEmpty()) { if (parts.isEmpty()) {
return null; return null;
} }
return parts.get(1); return (T) parts.get(index);
} }
} }

@ -195,4 +195,9 @@ public class RedissonPromise<T> implements RPromise<T> {
} }
} }
@Override
public String toString() {
return "RedissonPromise [promise=" + promise + "]";
}
} }

@ -7,7 +7,11 @@ import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -104,6 +108,31 @@ public class RedissonTopicTest {
} }
@Test
public void testPing() throws InterruptedException {
Config config = BaseTest.createConfig();
config.useSingleServer().setPingConnectionInterval(50);
RedissonClient redisson = Redisson.create(config);
Set<String> sentItems = new HashSet<>();
Set<String> receivedItems = new HashSet<>();
RTopic<String> eventsTopic = redisson.getTopic("eventsTopic");
eventsTopic.addListener((channel, msg) -> receivedItems.add(msg));
for(int i = 0; i<1000; i++){
final String message = UUID.randomUUID().toString();
eventsTopic.publish(message);
sentItems.add(message);
Thread.sleep(10);
}
Thread.sleep(2000);
assertThat(sentItems).hasSameSizeAs(receivedItems);
redisson.shutdown();
}
@Test @Test
public void testConcurrentTopic() throws Exception { public void testConcurrentTopic() throws Exception {
RedissonClient redisson = BaseTest.createInstance(); RedissonClient redisson = BaseTest.createInstance();

Loading…
Cancel
Save