Feature - RTopic.countSubscribers method added. #1472

pull/1827/head
Nikita Koksharov 6 years ago
parent e2690921a3
commit cd4326d6d2

@ -27,6 +27,8 @@ import org.redisson.client.ChannelName;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.config.MasterSlaveServersConfig;
@ -80,7 +82,7 @@ public class RedissonTopic implements RTopic {
@Override
public RFuture<Long> publishAsync(Object message) {
return commandExecutor.writeAsync(name, codec, RedisCommands.PUBLISH, name, encode(message));
return commandExecutor.writeAsync(name, StringCodec.INSTANCE, RedisCommands.PUBLISH, name, encode(message));
}
protected ByteBuf encode(Object value) {
@ -273,4 +275,14 @@ public class RedissonTopic implements RTopic {
return 0;
}
@Override
public RFuture<Long> countSubscribersAsync() {
return commandExecutor.writeAsync(name, LongCodec.INSTANCE, RedisCommands.PUBSUB_NUMSUB, name);
}
@Override
public long countSubscribers() {
return commandExecutor.get(countSubscribersAsync());
}
}

@ -85,9 +85,17 @@ public interface RTopic extends RTopicAsync {
void removeAllListeners();
/**
* Returns amount of registered listeners
* Returns amount of registered listeners to this topic
*
* @return amount of listeners
*/
int countListeners();
/**
* Returns amount of subscribers to this topic across all Redisson instances.
* Each subscriber may have multiple listeners.
*
* @return amount of subscribers
*/
long countSubscribers();
}

@ -72,4 +72,12 @@ public interface RTopicAsync {
*/
RFuture<Void> removeListenerAsync(MessageListener<?> listener);
/**
* Returns amount of subscribers to this topic across all Redisson instances.
* Each subscriber may have multiple listeners.
*
* @return amount of subscribers
*/
RFuture<Long> countSubscribersAsync();
}

@ -72,4 +72,13 @@ public interface RTopicReactive {
* @param listenerId - listener id
*/
void removeListener(int listenerId);
/**
* Returns amount of subscribers to this topic across all Redisson instances.
* Each subscriber may have multiple listeners.
*
* @return amount of subscribers
*/
Publisher<Long> countSubscribers();
}

@ -82,5 +82,13 @@ public interface RTopicRx {
* @return stream of messages
*/
<M> Flowable<M> getMessages(Class<M> type);
/**
* Returns amount of subscribers to this topic across all Redisson instances.
* Each subscriber may have multiple listeners.
*
* @return amount of subscribers
*/
Flowable<Long> countSubscribers();
}

@ -426,6 +426,7 @@ public interface RedisCommands {
RedisStrictCommand<Void> QUIT = new RedisStrictCommand<Void>("QUIT", new VoidReplayConvertor());
RedisStrictCommand<Long> PUBLISH = new RedisStrictCommand<Long>("PUBLISH");
RedisCommand<Long> PUBSUB_NUMSUB = new RedisCommand<Long>("PUBSUB", "NUMSUB", new ListObjectDecoder<Long>(1));
RedisCommand<Object> SUBSCRIBE = new RedisCommand<Object>("SUBSCRIBE", new PubSubStatusDecoder());
RedisCommand<Object> UNSUBSCRIBE = new RedisCommand<Object>("UNSUBSCRIBE", new PubSubStatusDecoder());

@ -107,6 +107,20 @@ public class RedissonTopicTest {
}
@Test
public void testCountSubscribers() {
RedissonClient redisson = BaseTest.createInstance();
RTopic topic1 = redisson.getTopic("topic", LongCodec.INSTANCE);
assertThat(topic1.countSubscribers()).isZero();
int id = topic1.addListener(Long.class, (channel, msg) -> {
});
assertThat(topic1.countSubscribers()).isOne();
topic1.removeListener(id);
assertThat(topic1.countSubscribers()).isZero();
redisson.shutdown();
}
@Test
public void testCountListeners() {
RedissonClient redisson = BaseTest.createInstance();

Loading…
Cancel
Save