Feature - RShardedTopic.countSubscribers() method implemented

pull/5676/head
Nikita Koksharov 11 months ago
parent 3470e8cc8c
commit a2e833770f

@ -21,6 +21,7 @@ import org.redisson.api.RShardedTopic;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.pubsub.PubSubType;
@ -88,6 +89,6 @@ public class RedissonShardedTopic extends RedissonTopic implements RShardedTopic
@Override
public RFuture<Long> countSubscribersAsync() {
throw new UnsupportedOperationException("Sharded PUBSUB doesn't support this operation");
return commandExecutor.writeAsync(name, LongCodec.INSTANCE, RedisCommands.PUBSUB_SHARDNUMSUB, channelName);
}
}

@ -15,88 +15,12 @@
*/
package org.redisson.api;
import org.redisson.api.listener.MessageListener;
import org.redisson.api.listener.StatusListener;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;
/**
* Reactive interface for Sharded Topic. Messages are delivered to message listeners connected to the same Topic.
*
* @author Nikita Koksharov
*
*/
public interface RShardedTopicReactive {
/**
* Get topic channel names
*
* @return channel names
*/
List<String> getChannelNames();
/**
* Publish the message to all subscribers of this topic asynchronously
*
* @param message to send
* @return the <code>Future</code> object with number of clients that received the message
*/
Mono<Long> publish(Object message);
/**
* Subscribes to status changes of this topic
*
* @param listener for messages
* @return listener id
* @see org.redisson.api.listener.StatusListener
*/
Mono<Integer> addListener(StatusListener listener);
/**
* Subscribes to this topic.
* <code>MessageListener.onMessage</code> is called when any message
* is published on this topic.
*
* @param <M> type of message
* @param type - type of message
* @param listener for messages
* @return locally unique listener id
* @see org.redisson.api.listener.MessageListener
*/
<M> Mono<Integer> addListener(Class<M> type, MessageListener<M> listener);
/**
* Removes the listener by <code>id</code> for listening this topic
*
* @param listenerIds - message listener ids
* @return void
*/
Mono<Void> removeListener(Integer... listenerIds);
/**
* Removes the listener by <code>instance</code> for listening this topic
*
* @param listener - message listener
* @return void
*/
Mono<Void> removeListener(MessageListener<?> listener);
/**
* Returns continues stream of published messages.
*
* @param <M> type of message
* @param type - type of message to listen
* @return stream of messages
*/
<M> Flux<M> getMessages(Class<M> type);
/**
* Removes all listeners from this topic
*
* @return void
*/
Mono<Void> removeAllListeners();
public interface RShardedTopicReactive extends RTopicReactive {
}

@ -15,97 +15,12 @@
*/
package org.redisson.api;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Single;
import org.redisson.api.listener.MessageListener;
import org.redisson.api.listener.StatusListener;
import java.util.List;
/**
* RxJava3 interface for Sharded Topic. Messages are delivered to message listeners connected to the same Topic.
*
* @author Nikita Koksharov
*
*/
public interface RShardedTopicRx {
/**
* Get topic channel names
*
* @return channel names
*/
List<String> getChannelNames();
/**
* Publish the message to all subscribers of this topic asynchronously
*
* @param message to send
* @return the <code>Future</code> object with number of clients that received the message
*/
Single<Long> publish(Object message);
/**
* Subscribes to status changes of this topic
*
* @param listener for messages
* @return listener id
* @see org.redisson.api.listener.StatusListener
*/
Single<Integer> addListener(StatusListener listener);
/**
* Subscribes to this topic.
* <code>MessageListener.onMessage</code> is called when any message
* is published on this topic.
*
* @param <M> - type of message
* @param type - type of message
* @param listener for messages
* @return locally unique listener id
* @see org.redisson.api.listener.MessageListener
*/
<M> Single<Integer> addListener(Class<M> type, MessageListener<M> listener);
/**
* Removes the listener by <code>id</code> for listening this topic
*
* @param listenerIds - message listener ids
* @return void
*/
Completable removeListener(Integer... listenerIds);
/**
* Removes the listener by <code>instance</code> for listening this topic
*
* @param listener - message listener
* @return void
*/
Completable removeListener(MessageListener<?> listener);
/**
* Returns continues stream of published messages.
*
* @param <M> - type of message
* @param type - type of message to listen
* @return stream of messages
*/
<M> Flowable<M> getMessages(Class<M> type);
/**
* Returns amount of subscribers to this topic across all Redisson instances.
* Each subscriber may have multiple listeners.
*
* @return amount of subscribers
*/
Single<Long> countSubscribers();
/**
* Removes all listeners from this topic
*
* @return void
*/
Completable removeAllListeners();
public interface RShardedTopicRx extends RTopicRx {
}

@ -25,7 +25,7 @@ import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Single;
/**
* RxJava2 interface for Publish Subscribe object. Messages are delivered to all message listeners across Redis cluster.
* RxJava3 interface for Publish Subscribe object. Messages are delivered to all message listeners across Redis cluster.
*
* @author Nikita Koksharov
*

@ -708,16 +708,17 @@ public interface RedisCommands {
RedisStrictCommand<Long> PUBLISH = new RedisStrictCommand<Long>("PUBLISH");
RedisStrictCommand<Long> SPUBLISH = new RedisStrictCommand<Long>("SPUBLISH");
RedisCommand<Long> PUBSUB_NUMSUB = new RedisCommand<Long>("PUBSUB", "NUMSUB", new ListObjectDecoder<Long>(1));
RedisCommand<Long> PUBSUB_NUMSUB = new RedisCommand<>("PUBSUB", "NUMSUB", new ListObjectDecoder<>(1));
RedisCommand<List<String>> PUBSUB_CHANNELS = new RedisStrictCommand<>("PUBSUB", "CHANNELS", new StringListReplayDecoder());
RedisCommand<List<String>> PUBSUB_SHARDCHANNELS = new RedisStrictCommand<>("PUBSUB", "SHARDCHANNELS", new StringListReplayDecoder());
RedisCommand<Object> SSUBSCRIBE = new RedisCommand<Object>("SSUBSCRIBE", new PubSubStatusDecoder());
RedisCommand<Object> SUBSCRIBE = new RedisCommand<Object>("SUBSCRIBE", new PubSubStatusDecoder());
RedisCommand<Object> UNSUBSCRIBE = new RedisCommand<Object>("UNSUBSCRIBE", new PubSubStatusDecoder());
RedisCommand<Object> SUNSUBSCRIBE = new RedisCommand<Object>("SUNSUBSCRIBE", new PubSubStatusDecoder());
RedisCommand<Object> PSUBSCRIBE = new RedisCommand<Object>("PSUBSCRIBE", new PubSubStatusDecoder());
RedisCommand<Object> PUNSUBSCRIBE = new RedisCommand<Object>("PUNSUBSCRIBE", new PubSubStatusDecoder());
RedisCommand<Long> PUBSUB_SHARDNUMSUB = new RedisCommand<>("PUBSUB", "SHARDNUMSUB", new ListObjectDecoder<>(1));
RedisCommand<Object> SSUBSCRIBE = new RedisCommand<>("SSUBSCRIBE", new PubSubStatusDecoder());
RedisCommand<Object> SUBSCRIBE = new RedisCommand<>("SUBSCRIBE", new PubSubStatusDecoder());
RedisCommand<Object> UNSUBSCRIBE = new RedisCommand<>("UNSUBSCRIBE", new PubSubStatusDecoder());
RedisCommand<Object> SUNSUBSCRIBE = new RedisCommand<>("SUNSUBSCRIBE", new PubSubStatusDecoder());
RedisCommand<Object> PSUBSCRIBE = new RedisCommand<>("PSUBSCRIBE", new PubSubStatusDecoder());
RedisCommand<Object> PUNSUBSCRIBE = new RedisCommand<>("PUNSUBSCRIBE", new PubSubStatusDecoder());
Set<String> PUBSUB_COMMANDS = Collections.unmodifiableSet(new HashSet<>(
Arrays.asList(PSUBSCRIBE.getName(), SUBSCRIBE.getName(), PUNSUBSCRIBE.getName(),

@ -79,6 +79,8 @@ public class RedissonShardedTopicTest extends RedisDockerTest {
}
});
assertThat(topic.countSubscribers()).isEqualTo(1);
RedisCluster rnc = redisson.getRedisNodes(RedisNodes.CLUSTER);
for (RedisClusterMaster master : rnc.getMasters()) {
RedisClientConfig cc = new RedisClientConfig();
@ -95,7 +97,10 @@ public class RedissonShardedTopicTest extends RedisDockerTest {
Awaitility.waitAtMost(Duration.ofSeconds(30)).until(() -> subscriptions.get() == 2);
redisson.getShardedTopic("3").publish(1);
assertThat(executed.get()).isTrue();
Awaitility.waitAtMost(Duration.ofSeconds(1)).untilAsserted(() -> {
assertThat(executed.get()).isTrue();
});
redisson.shutdown();
});

Loading…
Cancel
Save