Feature - Add createConsumer() method to RStream object #3378

pull/3384/head
Nikita Koksharov 4 years ago
parent a4ceb11109
commit 4b8230fe22

@ -972,6 +972,16 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
get(removeGroupAsync(groupName));
}
@Override
public void createConsumer(String groupName, String consumerName) {
get(createConsumerAsync(groupName, consumerName));
}
@Override
public RFuture<Void> createConsumerAsync(String groupName, String consumerName) {
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XGROUP, "CREATECONSUMER", getName(), groupName, consumerName);
}
@Override
public RFuture<Long> removeConsumerAsync(String groupName, String consumerName) {
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XGROUP_LONG, "DELCONSUMER", getName(), groupName, consumerName);

@ -58,6 +58,16 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
*/
void removeGroup(String groupName);
/**
* Creates consumer of the group by name.
* <p>
* Requires <b>Redis 6.2.0 and higher.</b>
*
* @param groupName - name of group
* @param consumerName - name of consumer
*/
void createConsumer(String groupName, String consumerName);
/**
* Removes consumer of the group by name.
*

@ -59,6 +59,16 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
*/
RFuture<Void> removeGroupAsync(String groupName);
/**
* Creates consumer of the group by name.
* <p>
* Requires <b>Redis 6.2.0 and higher.</b>
*
* @param groupName - name of group
* @param consumerName - name of consumer
*/
RFuture<Void> createConsumerAsync(String groupName, String consumerName);
/**
* Removes consumer of the group by name.
*

@ -61,6 +61,16 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
*/
Mono<Void> removeGroup(String groupName);
/**
* Creates consumer of the group by name.
* <p>
* Requires <b>Redis 6.2.0 and higher.</b>
*
* @param groupName - name of group
* @param consumerName - name of consumer
*/
Mono<Void> createConsumer(String groupName, String consumerName);
/**
* Removes consumer of the group by name.
*

@ -62,6 +62,16 @@ public interface RStreamRx<K, V> extends RExpirableRx {
*/
Completable removeGroup(String groupName);
/**
* Creates consumer of the group by name.
* <p>
* Requires <b>Redis 6.2.0 and higher.</b>
*
* @param groupName - name of group
* @param consumerName - name of consumer
*/
Completable createConsumer(String groupName, String consumerName);
/**
* Removes consumer of the group by name.
*

Loading…
Cancel
Save