diff --git a/redisson/src/main/java/org/redisson/RedissonStream.java b/redisson/src/main/java/org/redisson/RedissonStream.java index d2fb10204..87681eeb8 100644 --- a/redisson/src/main/java/org/redisson/RedissonStream.java +++ b/redisson/src/main/java/org/redisson/RedissonStream.java @@ -972,6 +972,16 @@ public class RedissonStream extends RedissonExpirable implements RStream createConsumerAsync(String groupName, String consumerName) { + return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XGROUP, "CREATECONSUMER", getName(), groupName, consumerName); + } + @Override public RFuture removeConsumerAsync(String groupName, String consumerName) { return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XGROUP_LONG, "DELCONSUMER", getName(), groupName, consumerName); diff --git a/redisson/src/main/java/org/redisson/api/RStream.java b/redisson/src/main/java/org/redisson/api/RStream.java index 77bfd3143..afe9dfff6 100644 --- a/redisson/src/main/java/org/redisson/api/RStream.java +++ b/redisson/src/main/java/org/redisson/api/RStream.java @@ -58,6 +58,16 @@ public interface RStream extends RStreamAsync, RExpirable { */ void removeGroup(String groupName); + /** + * Creates consumer of the group by name. + *

+ * Requires Redis 6.2.0 and higher. + * + * @param groupName - name of group + * @param consumerName - name of consumer + */ + void createConsumer(String groupName, String consumerName); + /** * Removes consumer of the group by name. * diff --git a/redisson/src/main/java/org/redisson/api/RStreamAsync.java b/redisson/src/main/java/org/redisson/api/RStreamAsync.java index 16ab3577c..ed79b4cdc 100644 --- a/redisson/src/main/java/org/redisson/api/RStreamAsync.java +++ b/redisson/src/main/java/org/redisson/api/RStreamAsync.java @@ -59,6 +59,16 @@ public interface RStreamAsync extends RExpirableAsync { */ RFuture removeGroupAsync(String groupName); + /** + * Creates consumer of the group by name. + *

+ * Requires Redis 6.2.0 and higher. + * + * @param groupName - name of group + * @param consumerName - name of consumer + */ + RFuture createConsumerAsync(String groupName, String consumerName); + /** * Removes consumer of the group by name. * diff --git a/redisson/src/main/java/org/redisson/api/RStreamReactive.java b/redisson/src/main/java/org/redisson/api/RStreamReactive.java index 17b0840eb..f71fc9183 100644 --- a/redisson/src/main/java/org/redisson/api/RStreamReactive.java +++ b/redisson/src/main/java/org/redisson/api/RStreamReactive.java @@ -61,6 +61,16 @@ public interface RStreamReactive extends RExpirableReactive { */ Mono removeGroup(String groupName); + /** + * Creates consumer of the group by name. + *

+ * Requires Redis 6.2.0 and higher. + * + * @param groupName - name of group + * @param consumerName - name of consumer + */ + Mono createConsumer(String groupName, String consumerName); + /** * Removes consumer of the group by name. * diff --git a/redisson/src/main/java/org/redisson/api/RStreamRx.java b/redisson/src/main/java/org/redisson/api/RStreamRx.java index d6bd449fa..9fc3e9e4d 100644 --- a/redisson/src/main/java/org/redisson/api/RStreamRx.java +++ b/redisson/src/main/java/org/redisson/api/RStreamRx.java @@ -62,6 +62,16 @@ public interface RStreamRx extends RExpirableRx { */ Completable removeGroup(String groupName); + /** + * Creates consumer of the group by name. + *

+ * Requires Redis 6.2.0 and higher. + * + * @param groupName - name of group + * @param consumerName - name of consumer + */ + Completable createConsumer(String groupName, String consumerName); + /** * Removes consumer of the group by name. *