diff --git a/redisson/src/main/java/org/redisson/RedissonTopic.java b/redisson/src/main/java/org/redisson/RedissonTopic.java index 7c846f5cd..6ea6d403f 100644 --- a/redisson/src/main/java/org/redisson/RedissonTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonTopic.java @@ -27,6 +27,8 @@ import org.redisson.client.ChannelName; import org.redisson.client.RedisPubSubListener; import org.redisson.client.RedisTimeoutException; import org.redisson.client.codec.Codec; +import org.redisson.client.codec.LongCodec; +import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; import org.redisson.config.MasterSlaveServersConfig; @@ -80,7 +82,7 @@ public class RedissonTopic implements RTopic { @Override public RFuture publishAsync(Object message) { - return commandExecutor.writeAsync(name, codec, RedisCommands.PUBLISH, name, encode(message)); + return commandExecutor.writeAsync(name, StringCodec.INSTANCE, RedisCommands.PUBLISH, name, encode(message)); } protected ByteBuf encode(Object value) { @@ -273,4 +275,14 @@ public class RedissonTopic implements RTopic { return 0; } + @Override + public RFuture countSubscribersAsync() { + return commandExecutor.writeAsync(name, LongCodec.INSTANCE, RedisCommands.PUBSUB_NUMSUB, name); + } + + @Override + public long countSubscribers() { + return commandExecutor.get(countSubscribersAsync()); + } + } diff --git a/redisson/src/main/java/org/redisson/api/RTopic.java b/redisson/src/main/java/org/redisson/api/RTopic.java index 63a0e67d5..d75c9f537 100644 --- a/redisson/src/main/java/org/redisson/api/RTopic.java +++ b/redisson/src/main/java/org/redisson/api/RTopic.java @@ -85,9 +85,17 @@ public interface RTopic extends RTopicAsync { void removeAllListeners(); /** - * Returns amount of registered listeners + * Returns amount of registered listeners to this topic * * @return amount of listeners */ int countListeners(); + + /** + * Returns amount of subscribers to this topic across all Redisson instances. + * Each subscriber may have multiple listeners. + * + * @return amount of subscribers + */ + long countSubscribers(); } diff --git a/redisson/src/main/java/org/redisson/api/RTopicAsync.java b/redisson/src/main/java/org/redisson/api/RTopicAsync.java index 380d8851c..1b69136bb 100644 --- a/redisson/src/main/java/org/redisson/api/RTopicAsync.java +++ b/redisson/src/main/java/org/redisson/api/RTopicAsync.java @@ -72,4 +72,12 @@ public interface RTopicAsync { */ RFuture removeListenerAsync(MessageListener listener); + /** + * Returns amount of subscribers to this topic across all Redisson instances. + * Each subscriber may have multiple listeners. + * + * @return amount of subscribers + */ + RFuture countSubscribersAsync(); + } diff --git a/redisson/src/main/java/org/redisson/api/RTopicReactive.java b/redisson/src/main/java/org/redisson/api/RTopicReactive.java index 27385f46a..9457a00f1 100644 --- a/redisson/src/main/java/org/redisson/api/RTopicReactive.java +++ b/redisson/src/main/java/org/redisson/api/RTopicReactive.java @@ -72,4 +72,13 @@ public interface RTopicReactive { * @param listenerId - listener id */ void removeListener(int listenerId); + + /** + * Returns amount of subscribers to this topic across all Redisson instances. + * Each subscriber may have multiple listeners. + * + * @return amount of subscribers + */ + Publisher countSubscribers(); + } diff --git a/redisson/src/main/java/org/redisson/api/RTopicRx.java b/redisson/src/main/java/org/redisson/api/RTopicRx.java index eedfbf723..ffdc929a5 100644 --- a/redisson/src/main/java/org/redisson/api/RTopicRx.java +++ b/redisson/src/main/java/org/redisson/api/RTopicRx.java @@ -82,5 +82,13 @@ public interface RTopicRx { * @return stream of messages */ Flowable getMessages(Class type); + + /** + * Returns amount of subscribers to this topic across all Redisson instances. + * Each subscriber may have multiple listeners. + * + * @return amount of subscribers + */ + Flowable countSubscribers(); } diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index 825c67a10..75ce207ef 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -426,6 +426,7 @@ public interface RedisCommands { RedisStrictCommand QUIT = new RedisStrictCommand("QUIT", new VoidReplayConvertor()); RedisStrictCommand PUBLISH = new RedisStrictCommand("PUBLISH"); + RedisCommand PUBSUB_NUMSUB = new RedisCommand("PUBSUB", "NUMSUB", new ListObjectDecoder(1)); RedisCommand SUBSCRIBE = new RedisCommand("SUBSCRIBE", new PubSubStatusDecoder()); RedisCommand UNSUBSCRIBE = new RedisCommand("UNSUBSCRIBE", new PubSubStatusDecoder()); diff --git a/redisson/src/test/java/org/redisson/RedissonTopicTest.java b/redisson/src/test/java/org/redisson/RedissonTopicTest.java index 19da8d3c2..a7e36f4d8 100644 --- a/redisson/src/test/java/org/redisson/RedissonTopicTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTopicTest.java @@ -107,6 +107,20 @@ public class RedissonTopicTest { } + @Test + public void testCountSubscribers() { + RedissonClient redisson = BaseTest.createInstance(); + RTopic topic1 = redisson.getTopic("topic", LongCodec.INSTANCE); + assertThat(topic1.countSubscribers()).isZero(); + int id = topic1.addListener(Long.class, (channel, msg) -> { + }); + assertThat(topic1.countSubscribers()).isOne(); + topic1.removeListener(id); + assertThat(topic1.countSubscribers()).isZero(); + + redisson.shutdown(); + } + @Test public void testCountListeners() { RedissonClient redisson = BaseTest.createInstance();