diff --git a/redisson/src/main/java/org/redisson/api/RStreamReactive.java b/redisson/src/main/java/org/redisson/api/RStreamReactive.java index 5cc9bc8ed..7dc84fa9c 100644 --- a/redisson/src/main/java/org/redisson/api/RStreamReactive.java +++ b/redisson/src/main/java/org/redisson/api/RStreamReactive.java @@ -53,6 +53,32 @@ public interface RStreamReactive extends RExpirableReactive { */ Publisher createGroup(String groupName, StreamMessageId id); + /** + * Removes group by name. + * + * @param groupName - name of group + * @return void + */ + Publisher removeGroup(String groupName); + + /** + * Removes consumer of the group by name. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @return number of pending messages owned by consumer + */ + Publisher removeConsumer(String groupName, String consumerName); + + /** + * Updates next message id delivered to consumers. + * + * @param groupName - name of group + * @param id - Stream Message ID + * @return void + */ + Publisher updateGroupMessageId(String groupName, StreamMessageId id); + /** * Marks pending messages by group name and stream ids as correctly processed. * @@ -99,7 +125,7 @@ public interface RStreamReactive extends RExpirableReactive { * @param count - amount of messages * @return list */ - Publisher> listPending(String groupName, StreamMessageId startId, StreamMessageId endId, int count, String consumerName); + Publisher> listPending(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, int count); /** * Transfers ownership of pending messages by id to a new consumer @@ -483,5 +509,29 @@ public interface RStreamReactive extends RExpirableReactive { * @return stream data mapped by Stream ID */ Publisher>> rangeReversed(int count, StreamMessageId startId, StreamMessageId endId); + + /** + * Removes messages by id. + * + * @param ids - id of messages to remove + * @return deleted messages amount + */ + Publisher remove(StreamMessageId... ids); + + /** + * Trims stream to specified size + * + * @param size - new size of stream + * @return number of deleted messages + */ + Publisher trim(int size); + + /** + * Trims stream to few tens of entries more than specified length to trim. + * + * @param size - new size of stream + * @return number of deleted messages + */ + Publisher trimNonStrict(int size); } diff --git a/redisson/src/main/java/org/redisson/api/RStreamRx.java b/redisson/src/main/java/org/redisson/api/RStreamRx.java index 6b612bf66..681aaa016 100644 --- a/redisson/src/main/java/org/redisson/api/RStreamRx.java +++ b/redisson/src/main/java/org/redisson/api/RStreamRx.java @@ -53,6 +53,32 @@ public interface RStreamRx extends RExpirableRx { */ Flowable createGroup(String groupName, StreamMessageId id); + /** + * Removes group by name. + * + * @param groupName - name of group + * @return void + */ + Flowable removeGroup(String groupName); + + /** + * Removes consumer of the group by name. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @return number of pending messages owned by consumer + */ + Flowable removeConsumer(String groupName, String consumerName); + + /** + * Updates next message id delivered to consumers. + * + * @param groupName - name of group + * @param id - Stream Message ID + * @return void + */ + Flowable updateGroupMessageId(String groupName, StreamMessageId id); + /** * Marks pending messages by group name and stream ids as correctly processed. * @@ -99,7 +125,7 @@ public interface RStreamRx extends RExpirableRx { * @param count - amount of messages * @return list */ - Flowable> listPending(String groupName, StreamMessageId startId, StreamMessageId endId, int count, String consumerName); + Flowable> listPending(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, int count); /** * Transfers ownership of pending messages by id to a new consumer @@ -484,4 +510,28 @@ public interface RStreamRx extends RExpirableRx { */ Flowable>> rangeReversed(int count, StreamMessageId startId, StreamMessageId endId); + /** + * Removes messages by id. + * + * @param ids - id of messages to remove + * @return deleted messages amount + */ + Flowable remove(StreamMessageId... ids); + + /** + * Trims stream to specified size + * + * @param size - new size of stream + * @return number of deleted messages + */ + Flowable trim(int size); + + /** + * Trims stream to few tens of entries more than specified length to trim. + * + * @param size - new size of stream + * @return number of deleted messages + */ + Flowable trimNonStrict(int size); + }