RStreamReactive and RStreamRx synced with RStream

pull/1821/head
Nikita Koksharov 6 years ago
parent 4632aee75a
commit 6a4414b40c

@ -53,6 +53,32 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
*/
Publisher<Void> createGroup(String groupName, StreamMessageId id);
/**
* Removes group by name.
*
* @param groupName - name of group
* @return void
*/
Publisher<Void> removeGroup(String groupName);
/**
* Removes consumer of the group by name.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @return number of pending messages owned by consumer
*/
Publisher<Long> removeConsumer(String groupName, String consumerName);
/**
* Updates next message id delivered to consumers.
*
* @param groupName - name of group
* @param id - Stream Message ID
* @return void
*/
Publisher<Void> updateGroupMessageId(String groupName, StreamMessageId id);
/**
* Marks pending messages by group name and stream <code>ids</code> as correctly processed.
*
@ -99,7 +125,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param count - amount of messages
* @return list
*/
Publisher<List<PendingEntry>> listPending(String groupName, StreamMessageId startId, StreamMessageId endId, int count, String consumerName);
Publisher<List<PendingEntry>> listPending(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, int count);
/**
* Transfers ownership of pending messages by id to a new consumer
@ -483,5 +509,29 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @return stream data mapped by Stream ID
*/
Publisher<Map<StreamMessageId, Map<K, V>>> rangeReversed(int count, StreamMessageId startId, StreamMessageId endId);
/**
* Removes messages by id.
*
* @param ids - id of messages to remove
* @return deleted messages amount
*/
Publisher<Long> remove(StreamMessageId... ids);
/**
* Trims stream to specified size
*
* @param size - new size of stream
* @return number of deleted messages
*/
Publisher<Long> trim(int size);
/**
* Trims stream to few tens of entries more than specified length to trim.
*
* @param size - new size of stream
* @return number of deleted messages
*/
Publisher<Long> trimNonStrict(int size);
}

@ -53,6 +53,32 @@ public interface RStreamRx<K, V> extends RExpirableRx {
*/
Flowable<Void> createGroup(String groupName, StreamMessageId id);
/**
* Removes group by name.
*
* @param groupName - name of group
* @return void
*/
Flowable<Void> removeGroup(String groupName);
/**
* Removes consumer of the group by name.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @return number of pending messages owned by consumer
*/
Flowable<Long> removeConsumer(String groupName, String consumerName);
/**
* Updates next message id delivered to consumers.
*
* @param groupName - name of group
* @param id - Stream Message ID
* @return void
*/
Flowable<Void> updateGroupMessageId(String groupName, StreamMessageId id);
/**
* Marks pending messages by group name and stream <code>ids</code> as correctly processed.
*
@ -99,7 +125,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param count - amount of messages
* @return list
*/
Flowable<List<PendingEntry>> listPending(String groupName, StreamMessageId startId, StreamMessageId endId, int count, String consumerName);
Flowable<List<PendingEntry>> listPending(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, int count);
/**
* Transfers ownership of pending messages by id to a new consumer
@ -484,4 +510,28 @@ public interface RStreamRx<K, V> extends RExpirableRx {
*/
Flowable<Map<StreamMessageId, Map<K, V>>> rangeReversed(int count, StreamMessageId startId, StreamMessageId endId);
/**
* Removes messages by id.
*
* @param ids - id of messages to remove
* @return deleted messages amount
*/
Flowable<Long> remove(StreamMessageId... ids);
/**
* Trims stream to specified size
*
* @param size - new size of stream
* @return number of deleted messages
*/
Flowable<Long> trim(int size);
/**
* Trims stream to few tens of entries more than specified length to trim.
*
* @param size - new size of stream
* @return number of deleted messages
*/
Flowable<Long> trimNonStrict(int size);
}

Loading…
Cancel
Save