diff --git a/src/main/java/org/redisson/RedisPubSubTopicListenerWrapper.java b/src/main/java/org/redisson/RedisPubSubTopicListenerWrapper.java index 32222fa16..d8704dde2 100644 --- a/src/main/java/org/redisson/RedisPubSubTopicListenerWrapper.java +++ b/src/main/java/org/redisson/RedisPubSubTopicListenerWrapper.java @@ -80,7 +80,8 @@ public class RedisPubSubTopicListenerWrapper implements RedisPubSubListener implements RedisPubSubListener { @Override - public void onStatus(Type type, String channel) { + public boolean onStatus(Type type, String channel) { + return false; } @Override diff --git a/src/main/java/org/redisson/client/OnceRedisPubSubListener.java b/src/main/java/org/redisson/client/OneShotPubSubListener.java similarity index 74% rename from src/main/java/org/redisson/client/OnceRedisPubSubListener.java rename to src/main/java/org/redisson/client/OneShotPubSubListener.java index ec9c00319..5059bdb71 100644 --- a/src/main/java/org/redisson/client/OnceRedisPubSubListener.java +++ b/src/main/java/org/redisson/client/OneShotPubSubListener.java @@ -17,21 +17,24 @@ package org.redisson.client; import org.redisson.client.protocol.pubsub.PubSubStatusMessage.Type; -public class OnceRedisPubSubListener implements RedisPubSubListener { +public class OneShotPubSubListener implements RedisPubSubListener { private RedisPubSubConnection connection; private RedisPubSubListener listener; - public OnceRedisPubSubListener(RedisPubSubConnection connection, RedisPubSubListener listener) { + public OneShotPubSubListener(RedisPubSubConnection connection, RedisPubSubListener listener) { super(); this.connection = connection; this.listener = listener; } @Override - public void onStatus(Type type, String channel) { - listener.onStatus(type, channel); - connection.removeListener(this); + public boolean onStatus(Type type, String channel) { + if (listener.onStatus(type, channel)) { + connection.removeListener(this); + return true; + } + return false; } @Override diff --git a/src/main/java/org/redisson/client/RedisPubSubConnection.java b/src/main/java/org/redisson/client/RedisPubSubConnection.java index f6e12ea5c..e04062e96 100644 --- a/src/main/java/org/redisson/client/RedisPubSubConnection.java +++ b/src/main/java/org/redisson/client/RedisPubSubConnection.java @@ -44,7 +44,7 @@ public class RedisPubSubConnection extends RedisConnection { } public void addOneShotListener(RedisPubSubListener listener) { - listeners.add(new OnceRedisPubSubListener(this, listener)); + listeners.add(new OneShotPubSubListener(this, listener)); } public void removeListener(RedisPubSubListener listener) { diff --git a/src/main/java/org/redisson/client/RedisPubSubListener.java b/src/main/java/org/redisson/client/RedisPubSubListener.java index 734eaf36f..16d684a19 100644 --- a/src/main/java/org/redisson/client/RedisPubSubListener.java +++ b/src/main/java/org/redisson/client/RedisPubSubListener.java @@ -19,7 +19,7 @@ import org.redisson.client.protocol.pubsub.PubSubStatusMessage; public interface RedisPubSubListener { - void onStatus(PubSubStatusMessage.Type type, String channel); + boolean onStatus(PubSubStatusMessage.Type type, String channel); void onMessage(String channel, V message); diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index ab987c8f7..ed916cc21 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -23,14 +23,12 @@ import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisPubSubListener; import org.redisson.client.protocol.Codec; -import org.redisson.client.protocol.pubsub.PubSubStatusMessage; import org.redisson.misc.InfinitySemaphoreLatch; import io.netty.channel.EventLoopGroup; import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.TimerTask; -import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 15ef9412c..4be7bb72f 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -337,14 +337,16 @@ public class MasterSlaveConnectionManager implements ConnectionManager { entry.unsubscribe(channelName, new BaseRedisPubSubListener() { @Override - public void onStatus(Type type, String channel) { + public boolean onStatus(Type type, String channel) { if (type == Type.UNSUBSCRIBE && channel.equals(channelName)) { synchronized (entry) { if (entry.tryClose()) { returnSubscribeConnection(-1, entry); } } + return true; } + return false; } }); @@ -360,14 +362,16 @@ public class MasterSlaveConnectionManager implements ConnectionManager { entry.punsubscribe(channelName, new BaseRedisPubSubListener() { @Override - public void onStatus(Type type, String channel) { + public boolean onStatus(Type type, String channel) { if (type == Type.PUNSUBSCRIBE && channel.equals(channelName)) { synchronized (entry) { if (entry.tryClose()) { returnSubscribeConnection(-1, entry); } } + return true; } + return false; } }); diff --git a/src/main/java/org/redisson/connection/PubSubConnectionEntry.java b/src/main/java/org/redisson/connection/PubSubConnectionEntry.java index f5455e83a..47f286ec1 100644 --- a/src/main/java/org/redisson/connection/PubSubConnectionEntry.java +++ b/src/main/java/org/redisson/connection/PubSubConnectionEntry.java @@ -137,7 +137,7 @@ public class PubSubConnectionEntry { public void unsubscribe(final String channel, RedisPubSubListener listener) { conn.addOneShotListener(new BaseRedisPubSubListener() { @Override - public void onStatus(Type type, String ch) { + public boolean onStatus(Type type, String ch) { if (type == Type.UNSUBSCRIBE && channel.equals(ch)) { Queue listeners = channelListeners.get(channel); if (listeners != null) { @@ -146,7 +146,9 @@ public class PubSubConnectionEntry { } } subscribedChannelsAmount.release(); + return true; } + return false; } }); conn.addOneShotListener(listener); @@ -156,7 +158,7 @@ public class PubSubConnectionEntry { public void punsubscribe(final String channel, RedisPubSubListener listener) { conn.addOneShotListener(new BaseRedisPubSubListener() { @Override - public void onStatus(Type type, String ch) { + public boolean onStatus(Type type, String ch) { if (type == Type.PUNSUBSCRIBE && channel.equals(ch)) { Queue listeners = channelListeners.get(channel); if (listeners != null) { @@ -165,7 +167,9 @@ public class PubSubConnectionEntry { } } subscribedChannelsAmount.release(); + return true; } + return false; } }); conn.addOneShotListener(listener); diff --git a/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/src/main/java/org/redisson/connection/SentinelConnectionManager.java index 19ee6828b..d5035b4dc 100644 --- a/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -33,14 +33,10 @@ import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubListener; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.StringCodec; -import org.redisson.client.protocol.pubsub.PubSubStatusMessage; import org.redisson.client.protocol.pubsub.PubSubStatusMessage.Type; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; - public class SentinelConnectionManager extends MasterSlaveConnectionManager { private final Logger log = LoggerFactory.getLogger(getClass()); @@ -133,10 +129,11 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } @Override - public void onStatus(Type type, String channel) { + public boolean onStatus(Type type, String channel) { if (type == Type.SUBSCRIBE) { log.info("subscribed to channel: {} from Sentinel {}:{}", channel, addr.getHost(), addr.getPort()); } + return true; } }); diff --git a/src/test/java/org/redisson/RedisClientTest.java b/src/test/java/org/redisson/RedisClientTest.java index 3ee00f19b..10f161018 100644 --- a/src/test/java/org/redisson/RedisClientTest.java +++ b/src/test/java/org/redisson/RedisClientTest.java @@ -21,10 +21,8 @@ import org.redisson.client.protocol.CommandsData; import org.redisson.client.protocol.LongCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.StringCodec; -import org.redisson.client.protocol.pubsub.PubSubStatusMessage; import org.redisson.client.protocol.pubsub.PubSubStatusMessage.Type; -import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; public class RedisClientTest { @@ -37,10 +35,11 @@ public class RedisClientTest { pubSubConnection.addListener(new RedisPubSubListener() { @Override - public void onStatus(Type type, String channel) { + public boolean onStatus(Type type, String channel) { Assert.assertEquals(Type.SUBSCRIBE, type); Assert.assertTrue(Arrays.asList("test1", "test2").contains(channel)); latch.countDown(); + return true; } @Override