Fixed - disconnected pubsub connection leads to missed response for unsubscribe/punsubscribe operations.

pull/555/head
Nikita 9 years ago
parent d5b27a0248
commit fae7f5b8ed

@ -49,7 +49,7 @@ public class RedisConnection implements RedisCommands {
private long lastUsageTime;
private final Future<?> acquireFuture = ImmediateEventExecutor.INSTANCE.newSucceededFuture(this);
public RedisConnection(RedisClient redisClient, Channel channel) {
super();
this.redisClient = redisClient;
@ -235,4 +235,7 @@ public class RedisConnection implements RedisCommands {
return acquireFuture;
}
public void onDisconnect() {
}
}

@ -16,8 +16,10 @@
package org.redisson.client;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.redisson.client.codec.Codec;
@ -30,9 +32,12 @@ import org.redisson.client.protocol.pubsub.PubSubMessageDecoder;
import org.redisson.client.protocol.pubsub.PubSubPatternMessage;
import org.redisson.client.protocol.pubsub.PubSubPatternMessageDecoder;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import org.redisson.client.protocol.pubsub.PubSubType;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent;
public class RedisPubSubConnection extends RedisConnection {
@ -40,6 +45,8 @@ public class RedisPubSubConnection extends RedisConnection {
final Queue<RedisPubSubListener<Object>> listeners = new ConcurrentLinkedQueue<RedisPubSubListener<Object>>();
final Map<String, Codec> channels = PlatformDependent.newConcurrentHashMap();
final Map<String, Codec> patternChannels = PlatformDependent.newConcurrentHashMap();
final Set<String> unsubscibedChannels = new HashSet<String>();
final Set<String> punsubscibedChannels = new HashSet<String>();
public RedisPubSubConnection(RedisClient redisClient, Channel channel) {
super(redisClient, channel);
@ -89,18 +96,69 @@ public class RedisPubSubConnection extends RedisConnection {
}
}
public void unsubscribe(String ... channel) {
async((MultiDecoder)null, RedisCommands.UNSUBSCRIBE, channel);
for (String ch : channel) {
channels.remove(ch);
public void unsubscribe(final String ... channels) {
synchronized (this) {
for (String ch : channels) {
this.channels.remove(ch);
unsubscibedChannels.add(ch);
}
}
ChannelFuture future = async((MultiDecoder)null, RedisCommands.UNSUBSCRIBE, channels);
future.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
for (String channel : channels) {
removeDisconnectListener(channel);
onMessage(new PubSubStatusMessage(PubSubType.UNSUBSCRIBE, channel));
}
}
}
});
}
public void removeDisconnectListener(String channel) {
synchronized (this) {
unsubscibedChannels.remove(channel);
punsubscibedChannels.remove(channel);
}
}
@Override
public void onDisconnect() {
Set<String> channels = new HashSet<String>();
Set<String> pchannels = new HashSet<String>();
synchronized (this) {
channels.addAll(unsubscibedChannels);
pchannels.addAll(punsubscibedChannels);
}
for (String channel : channels) {
onMessage(new PubSubStatusMessage(PubSubType.UNSUBSCRIBE, channel));
}
for (String channel : pchannels) {
onMessage(new PubSubStatusMessage(PubSubType.PUNSUBSCRIBE, channel));
}
}
public void punsubscribe(String ... channel) {
async((MultiDecoder)null, RedisCommands.PUNSUBSCRIBE, channel);
for (String ch : channel) {
patternChannels.remove(ch);
public void punsubscribe(final String ... channels) {
synchronized (this) {
for (String ch : channels) {
patternChannels.remove(ch);
punsubscibedChannels.add(ch);
}
}
ChannelFuture future = async((MultiDecoder)null, RedisCommands.PUNSUBSCRIBE, channels);
future.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
for (String channel : channels) {
removeDisconnectListener(channel);
onMessage(new PubSubStatusMessage(PubSubType.PUNSUBSCRIBE, channel));
}
}
}
});
}
private <T, R> ChannelFuture async(MultiDecoder<Object> messageDecoder, RedisCommand<T> command, Object ... params) {

@ -61,6 +61,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
RedisConnection connection = RedisConnection.getFrom(ctx.channel());
connection.onDisconnect();
if (!connection.isClosed()) {
EventLoopGroup group = ctx.channel().eventLoop().parent();
reconnect(group, connection);

@ -171,7 +171,8 @@ public class PubSubConnectionEntry {
conn.unsubscribe(channel);
}
public void removeListeners(String channel) {
private void removeListeners(String channel) {
conn.removeDisconnectListener(channel);
SubscribeListener s = subscribeChannelListeners.remove(channel);
conn.removeListener(s);
Queue<RedisPubSubListener> queue = channelListeners.get(channel);

Loading…
Cancel
Save