diff --git a/redisson/src/main/java/org/redisson/client/RedisPubSubConnection.java b/redisson/src/main/java/org/redisson/client/RedisPubSubConnection.java index 32cee9814..5d8cbcc6f 100644 --- a/redisson/src/main/java/org/redisson/client/RedisPubSubConnection.java +++ b/redisson/src/main/java/org/redisson/client/RedisPubSubConnection.java @@ -17,7 +17,6 @@ package org.redisson.client; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; -import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.CommandData; @@ -26,7 +25,10 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.MultiDecoder; import org.redisson.client.protocol.pubsub.*; -import java.util.*; +import java.util.Collections; +import java.util.Map; +import java.util.Queue; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -38,11 +40,11 @@ import java.util.concurrent.ConcurrentLinkedQueue; */ public class RedisPubSubConnection extends RedisConnection { - final Queue> listeners = new ConcurrentLinkedQueue>(); + final Queue> listeners = new ConcurrentLinkedQueue<>(); final Map channels = new ConcurrentHashMap<>(); final Map patternChannels = new ConcurrentHashMap<>(); - final Set unsubscibedChannels = new HashSet(); - final Set punsubscibedChannels = new HashSet(); + final Set unsubscibedChannels = Collections.newSetFromMap(new ConcurrentHashMap<>()); + final Set punsubscibedChannels = Collections.newSetFromMap(new ConcurrentHashMap<>()); public RedisPubSubConnection(RedisClient redisClient, Channel channel, CompletableFuture connectionPromise) { super(redisClient, channel, connectionPromise); @@ -88,22 +90,28 @@ public class RedisPubSubConnection extends RedisConnection { return async(new PubSubPatternMessageDecoder(codec.getValueDecoder()), RedisCommands.PSUBSCRIBE, channels); } - public ChannelFuture unsubscribe(ChannelName... channels) { - synchronized (this) { + public ChannelFuture unsubscribe(PubSubType type, ChannelName... channels) { + RedisCommand command; + if (type == PubSubType.UNSUBSCRIBE) { + command = RedisCommands.UNSUBSCRIBE; for (ChannelName ch : channels) { this.channels.remove(ch); unsubscibedChannels.add(ch); } + } else { + command = RedisCommands.PUNSUBSCRIBE; + for (ChannelName ch : channels) { + patternChannels.remove(ch); + punsubscibedChannels.add(ch); + } } - ChannelFuture future = async((MultiDecoder) null, RedisCommands.UNSUBSCRIBE, channels); - future.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - for (ChannelName channel : channels) { - removeDisconnectListener(channel); - onMessage(new PubSubStatusMessage(PubSubType.UNSUBSCRIBE, channel)); - } + + ChannelFuture future = async((MultiDecoder) null, command, channels); + future.addListener((FutureListener) f -> { + if (!f.isSuccess()) { + for (ChannelName channel : channels) { + removeDisconnectListener(channel); + onMessage(new PubSubStatusMessage(type, channel)); } } }); @@ -111,52 +119,22 @@ public class RedisPubSubConnection extends RedisConnection { } public void removeDisconnectListener(ChannelName channel) { - synchronized (this) { - unsubscibedChannels.remove(channel); - punsubscibedChannels.remove(channel); - } + unsubscibedChannels.remove(channel); + punsubscibedChannels.remove(channel); } @Override public void fireDisconnected() { super.fireDisconnected(); - Set channels = new HashSet(); - Set pchannels = new HashSet(); - synchronized (this) { - channels.addAll(unsubscibedChannels); - pchannels.addAll(punsubscibedChannels); - } - for (ChannelName channel : channels) { + for (ChannelName channel : unsubscibedChannels) { onMessage(new PubSubStatusMessage(PubSubType.UNSUBSCRIBE, channel)); } - for (ChannelName channel : pchannels) { + for (ChannelName channel : punsubscibedChannels) { onMessage(new PubSubStatusMessage(PubSubType.PUNSUBSCRIBE, channel)); } } - public ChannelFuture punsubscribe(ChannelName... channels) { - synchronized (this) { - for (ChannelName ch : channels) { - patternChannels.remove(ch); - punsubscibedChannels.add(ch); - } - } - ChannelFuture future = async((MultiDecoder) null, RedisCommands.PUNSUBSCRIBE, channels); - future.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - for (ChannelName channel : channels) { - removeDisconnectListener(channel); - onMessage(new PubSubStatusMessage(PubSubType.PUNSUBSCRIBE, channel)); - } - } - } - }); - return future; - } - private ChannelFuture async(MultiDecoder messageDecoder, RedisCommand command, Object... params) { CompletableFuture promise = new CompletableFuture<>(); return channel.writeAndFlush(new CommandData<>(promise, messageDecoder, null, command, params)); diff --git a/redisson/src/main/java/org/redisson/pubsub/PubSubConnectionEntry.java b/redisson/src/main/java/org/redisson/pubsub/PubSubConnectionEntry.java index a812e02ae..e38e27f02 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PubSubConnectionEntry.java +++ b/redisson/src/main/java/org/redisson/pubsub/PubSubConnectionEntry.java @@ -19,8 +19,11 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import io.netty.channel.ChannelFutureListener; import org.redisson.PubSubMessageListener; import org.redisson.PubSubPatternMessageListener; import org.redisson.client.BaseRedisPubSubListener; @@ -29,9 +32,11 @@ import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubListener; import org.redisson.client.SubscribeListener; import org.redisson.client.codec.Codec; +import org.redisson.client.protocol.pubsub.PubSubStatusMessage; import org.redisson.client.protocol.pubsub.PubSubType; import io.netty.channel.ChannelFuture; +import org.redisson.connection.ConnectionManager; /** * @@ -48,10 +53,13 @@ public class PubSubConnectionEntry { private static final Queue> EMPTY_QUEUE = new LinkedList<>(); - public PubSubConnectionEntry(RedisPubSubConnection conn, int subscriptionsPerConnection) { + private final ConnectionManager connectionManager; + + public PubSubConnectionEntry(RedisPubSubConnection conn, ConnectionManager connectionManager) { super(); this.conn = conn; - this.subscribedChannelsAmount = new AtomicInteger(subscriptionsPerConnection); + this.connectionManager = connectionManager; + this.subscribedChannelsAmount = new AtomicInteger(connectionManager.getConfig().getSubscriptionsPerConnection()); } public int countListeners(ChannelName channelName) { @@ -176,11 +184,11 @@ public class PubSubConnectionEntry { return listener; } - public ChannelFuture unsubscribe(ChannelName channel, RedisPubSubListener listener) { + public void unsubscribe(PubSubType commandType, ChannelName channel, RedisPubSubListener listener, AtomicBoolean executed) { conn.addListener(new BaseRedisPubSubListener() { @Override public boolean onStatus(PubSubType type, CharSequence ch) { - if (type == PubSubType.UNSUBSCRIBE && channel.equals(ch)) { + if (type == commandType && channel.equals(ch)) { conn.removeListener(this); removeListeners(channel); if (listener != null) { @@ -190,9 +198,21 @@ public class PubSubConnectionEntry { } return false; } + }); + ChannelFuture future = conn.unsubscribe(commandType, channel); + future.addListener((ChannelFutureListener) f -> { + if (!f.isSuccess()) { + return; + } + + connectionManager.newTimeout(timeout -> { + if (executed.get()) { + return; + } + conn.onMessage(new PubSubStatusMessage(commandType, channel)); + }, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS); }); - return conn.unsubscribe(channel); } private void removeListeners(ChannelName channel) { @@ -210,24 +230,6 @@ public class PubSubConnectionEntry { } } - public ChannelFuture punsubscribe(final ChannelName channel, final RedisPubSubListener listener) { - conn.addListener(new BaseRedisPubSubListener() { - @Override - public boolean onStatus(PubSubType type, CharSequence ch) { - if (type == PubSubType.PUNSUBSCRIBE && channel.equals(ch)) { - conn.removeListener(this); - removeListeners(channel); - if (listener != null) { - listener.onStatus(type, ch); - } - return true; - } - return false; - } - }); - return conn.punsubscribe(channel); - } - public RedisPubSubConnection getConnection() { return conn; } diff --git a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java index 2b59ef0af..b4bdc44ff 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java +++ b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java @@ -21,7 +21,6 @@ import io.netty.util.Timeout; import org.redisson.PubSubPatternStatusListener; import org.redisson.client.*; import org.redisson.client.codec.Codec; -import org.redisson.client.protocol.pubsub.PubSubStatusMessage; import org.redisson.client.protocol.pubsub.PubSubType; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.connection.ConnectionManager; @@ -409,7 +408,7 @@ public class PublishSubscribeService { } freePubSubLock.acquire(() -> { - PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection()); + PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, connectionManager); int remainFreeAmount = entry.tryAcquire(); PubSubKey key = new PubSubKey(channelName, msEntry); @@ -482,26 +481,7 @@ public class PublishSubscribeService { }; - ChannelFuture future; - if (topicType == PubSubType.UNSUBSCRIBE) { - future = entry.unsubscribe(channelName, listener); - } else { - future = entry.punsubscribe(channelName, listener); - } - - future.addListener((ChannelFutureListener) f -> { - if (!f.isSuccess()) { - return; - } - - connectionManager.newTimeout(timeout -> { - if (executed.get()) { - return; - } - entry.getConnection().onMessage(new PubSubStatusMessage(topicType, channelName)); - }, config.getTimeout(), TimeUnit.MILLISECONDS); - }); - + entry.unsubscribe(topicType, channelName, listener, executed); return result; } @@ -558,25 +538,7 @@ public class PublishSubscribeService { }; - ChannelFuture future; - if (topicType == PubSubType.PUNSUBSCRIBE) { - future = entry.punsubscribe(channelName, listener); - } else { - future = entry.unsubscribe(channelName, listener); - } - - future.addListener((ChannelFutureListener) f -> { - if (!f.isSuccess()) { - return; - } - - connectionManager.newTimeout(timeout -> { - if (executed.get()) { - return; - } - entry.getConnection().onMessage(new PubSubStatusMessage(topicType, channelName)); - }, config.getTimeout(), TimeUnit.MILLISECONDS); - }); + entry.unsubscribe(topicType, channelName, listener, executed); }); });