diff --git a/redisson/src/main/java/org/redisson/api/RStreamReactive.java b/redisson/src/main/java/org/redisson/api/RStreamReactive.java index f71fc9183..d74528ccd 100644 --- a/redisson/src/main/java/org/redisson/api/RStreamReactive.java +++ b/redisson/src/main/java/org/redisson/api/RStreamReactive.java @@ -19,6 +19,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.redisson.api.stream.StreamAddArgs; +import org.redisson.api.stream.TrimStrategy; import reactor.core.publisher.Mono; /** @@ -486,91 +488,75 @@ public interface RStreamReactive extends RExpirableReactive { Mono size(); /** - * Appends a new entry and returns generated Stream ID - * - * @param key - key of entry - * @param value - value of entry - * @return Stream ID + * Appends a new entry/entries and returns generated Stream Message ID + * + * @param args - method arguments object + * @return Stream Message ID */ + Mono add(StreamAddArgs args); + + /** + * Appends a new entry/entries by specified Stream Message ID + * + * @param id - Stream Message ID + * @param args - method arguments object + */ + Mono add(StreamMessageId id, StreamAddArgs args); + + /* + * Use add(StreamAddArgs) method instead + * + */ + @Deprecated Mono add(K key, V value); - /** - * Appends a new entry by specified Stream ID - * - * @param id - Stream ID - * @param key - key of entry - * @param value - value of entry - * @return void + /* + * Use add(StreamMessageId, StreamAddArgs) method instead + * */ + @Deprecated Mono add(StreamMessageId id, K key, V value); - /** - * Appends a new entry and returns generated Stream ID. - * Trims stream to a specified trimLen size. - * If trimStrict is false then trims to few tens of entries more than specified length to trim. - * - * @param key - key of entry - * @param value - value of entry - * @param trimLen - length to trim - * @param trimStrict - if false then trims to few tens of entries more than specified length to trim - * @return Stream ID + /* + * Use add(StreamAddArgs) method instead + * */ + @Deprecated Mono add(K key, V value, int trimLen, boolean trimStrict); - /** - * Appends a new entry by specified Stream ID. - * Trims stream to a specified trimLen size. - * If trimStrict is false then trims to few tens of entries more than specified length to trim. - * - * @param id - Stream ID - * @param key - key of entry - * @param value - value of entry - * @param trimLen - length to trim - * @param trimStrict - if false then trims to few tens of entries more than specified length to trim - * @return void + /* + * Use add(StreamMessageId, StreamAddArgs) method instead + * */ + @Deprecated Mono add(StreamMessageId id, K key, V value, int trimLen, boolean trimStrict); - /** - * Appends new entries and returns generated Stream ID - * - * @param entries - entries to add - * @return Stream ID + /* + * Use add(StreamAddArgs) method instead + * */ + @Deprecated Mono addAll(Map entries); - /** - * Appends new entries by specified Stream ID - * - * @param id - Stream ID - * @param entries - entries to add - * @return void + /* + * Use add(StreamMessageId, StreamAddArgs) method instead + * */ + @Deprecated Mono addAll(StreamMessageId id, Map entries); - /** - * Appends new entries and returns generated Stream ID. - * Trims stream to a specified trimLen size. - * If trimStrict is false then trims to few tens of entries more than specified length to trim. - * - * @param entries - entries to add - * @param trimLen - length to trim - * @param trimStrict - if false then trims to few tens of entries more than specified length to trim - * @return Stream ID + /* + * Use add(StreamAddArgs) method instead + * */ + @Deprecated Mono addAll(Map entries, int trimLen, boolean trimStrict); - /** - * Appends new entries by specified Stream ID. - * Trims stream to a specified trimLen size. - * If trimStrict is false then trims to few tens of entries more than specified length to trim. - * - * @param id - Stream ID - * @param entries - entries to add - * @param trimLen - length to trim - * @param trimStrict - if false then trims to few tens of entries more than specified length to trim - * @return void + /* + * Use add(StreamMessageId, StreamAddArgs) method instead + * */ + @Deprecated Mono addAll(StreamMessageId id, Map entries, int trimLen, boolean trimStrict); /** @@ -823,6 +809,34 @@ public interface RStreamReactive extends RExpirableReactive { */ Mono trimNonStrict(int size); + /** + * Trims stream to specified size + * + * @param strategy - trim strategy + * @param threshold - new size of stream + * @return number of deleted messages + */ + Mono trim(TrimStrategy strategy, int threshold); + + /** + * Trims stream using almost exact trimming threshold. + * + * @param strategy - trim strategy + * @param threshold - trim threshold + * @return number of deleted messages + */ + Mono trimNonStrict(TrimStrategy strategy, int threshold); + + /** + * Trims stream using almost exact trimming threshold up to limit. + * + * @param strategy - trim strategy + * @param threshold - trim threshold + * @param limit - trim limit + * @return number of deleted messages + */ + Mono trimNonStrict(TrimStrategy strategy, int threshold, int limit); + /** * Returns information about this stream. * diff --git a/redisson/src/main/java/org/redisson/api/RStreamRx.java b/redisson/src/main/java/org/redisson/api/RStreamRx.java index 9fc3e9e4d..aa10f742b 100644 --- a/redisson/src/main/java/org/redisson/api/RStreamRx.java +++ b/redisson/src/main/java/org/redisson/api/RStreamRx.java @@ -21,6 +21,8 @@ import java.util.concurrent.TimeUnit; import io.reactivex.rxjava3.core.Completable; import io.reactivex.rxjava3.core.Single; +import org.redisson.api.stream.StreamAddArgs; +import org.redisson.api.stream.TrimStrategy; /** * Reactive interface for Redis Stream object. @@ -487,91 +489,75 @@ public interface RStreamRx extends RExpirableRx { Single size(); /** - * Appends a new entry and returns generated Stream ID - * - * @param key - key of entry - * @param value - value of entry - * @return Stream ID + * Appends a new entry/entries and returns generated Stream Message ID + * + * @param args - method arguments object + * @return Stream Message ID + */ + Single add(StreamAddArgs args); + + /** + * Appends a new entry/entries by specified Stream Message ID + * + * @param id - Stream Message ID + * @param args - method arguments object + */ + Completable add(StreamMessageId id, StreamAddArgs args); + + /* + * Use add(StreamAddArgs) method instead + * */ + @Deprecated Single add(K key, V value); - /** - * Appends a new entry by specified Stream ID - * - * @param id - Stream ID - * @param key - key of entry - * @param value - value of entry - * @return void + /* + * Use add(StreamMessageId, StreamAddArgs) method instead + * */ + @Deprecated Completable add(StreamMessageId id, K key, V value); - /** - * Appends a new entry and returns generated Stream ID. - * Trims stream to a specified trimLen size. - * If trimStrict is false then trims to few tens of entries more than specified length to trim. - * - * @param key - key of entry - * @param value - value of entry - * @param trimLen - length to trim - * @param trimStrict - if false then trims to few tens of entries more than specified length to trim - * @return Stream ID + /* + * Use add(StreamAddArgs) method instead + * */ + @Deprecated Single add(K key, V value, int trimLen, boolean trimStrict); - /** - * Appends a new entry by specified Stream ID. - * Trims stream to a specified trimLen size. - * If trimStrict is false then trims to few tens of entries more than specified length to trim. - * - * @param id - Stream ID - * @param key - key of entry - * @param value - value of entry - * @param trimLen - length to trim - * @param trimStrict - if false then trims to few tens of entries more than specified length to trim - * @return void + /* + * Use add(StreamMessageId, StreamAddArgs) method instead + * */ + @Deprecated Completable add(StreamMessageId id, K key, V value, int trimLen, boolean trimStrict); - /** - * Appends new entries and returns generated Stream ID - * - * @param entries - entries to add - * @return Stream ID + /* + * Use add(StreamAddArgs) method instead + * */ + @Deprecated Single addAll(Map entries); - /** - * Appends new entries by specified Stream ID - * - * @param id - Stream ID - * @param entries - entries to add - * @return void + /* + * Use add(StreamMessageId, StreamAddArgs) method instead + * */ + @Deprecated Completable addAll(StreamMessageId id, Map entries); - /** - * Appends new entries and returns generated Stream ID. - * Trims stream to a specified trimLen size. - * If trimStrict is false then trims to few tens of entries more than specified length to trim. - * - * @param entries - entries to add - * @param trimLen - length to trim - * @param trimStrict - if false then trims to few tens of entries more than specified length to trim - * @return Stream ID + /* + * Use add(StreamAddArgs) method instead + * */ + @Deprecated Single addAll(Map entries, int trimLen, boolean trimStrict); - /** - * Appends new entries by specified Stream ID. - * Trims stream to a specified trimLen size. - * If trimStrict is false then trims to few tens of entries more than specified length to trim. - * - * @param id - Stream ID - * @param entries - entries to add - * @param trimLen - length to trim - * @param trimStrict - if false then trims to few tens of entries more than specified length to trim - * @return void + /* + * Use add(StreamMessageId, StreamAddArgs) method instead + * */ + @Deprecated Completable addAll(StreamMessageId id, Map entries, int trimLen, boolean trimStrict); /** @@ -823,7 +809,35 @@ public interface RStreamRx extends RExpirableRx { * @return number of deleted messages */ Single trimNonStrict(int size); - + + /** + * Trims stream to specified size + * + * @param strategy - trim strategy + * @param threshold - new size of stream + * @return number of deleted messages + */ + Single trim(TrimStrategy strategy, int threshold); + + /** + * Trims stream using almost exact trimming threshold. + * + * @param strategy - trim strategy + * @param threshold - trim threshold + * @return number of deleted messages + */ + Single trimNonStrict(TrimStrategy strategy, int threshold); + + /** + * Trims stream using almost exact trimming threshold up to limit. + * + * @param strategy - trim strategy + * @param threshold - trim threshold + * @param limit - trim limit + * @return number of deleted messages + */ + Single trimNonStrict(TrimStrategy strategy, int threshold, int limit); + /** * Returns information about this stream. *