diff --git a/redisson/src/main/java/org/redisson/RedissonStream.java b/redisson/src/main/java/org/redisson/RedissonStream.java index 3855c236f..95a59bff7 100644 --- a/redisson/src/main/java/org/redisson/RedissonStream.java +++ b/redisson/src/main/java/org/redisson/RedissonStream.java @@ -684,4 +684,24 @@ public class RedissonStream extends RedissonExpirable implements RStream trimAsync(int count) { + return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XTRIM, "MAXLEN", count); + } + + @Override + public RFuture trimNonStrictAsync(int count) { + return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XTRIM, "MAXLEN", "~", count); + } + + @Override + public long trim(int count) { + return get(trimAsync(count)); + } + + @Override + public long trimNonStrict(int count) { + return get(trimNonStrictAsync(count)); + } + } diff --git a/redisson/src/main/java/org/redisson/api/RStream.java b/redisson/src/main/java/org/redisson/api/RStream.java index 8be730a27..2d8ecdc38 100644 --- a/redisson/src/main/java/org/redisson/api/RStream.java +++ b/redisson/src/main/java/org/redisson/api/RStream.java @@ -482,5 +482,21 @@ public interface RStream extends RStreamAsync, RExpirable { * @return deleted messages amount */ long remove(StreamMessageId... ids); + + /** + * Trims stream to specified size + * + * @param size - new size of stream + * @return number of deleted messages + */ + long trim(int size); + + /** + * Trims stream to few tens of entries more than specified length to trim. + * + * @param size - new size of stream + * @return number of deleted messages + */ + long trimNonStrict(int size); } diff --git a/redisson/src/main/java/org/redisson/api/RStreamAsync.java b/redisson/src/main/java/org/redisson/api/RStreamAsync.java index df535093a..9194b3659 100644 --- a/redisson/src/main/java/org/redisson/api/RStreamAsync.java +++ b/redisson/src/main/java/org/redisson/api/RStreamAsync.java @@ -488,5 +488,21 @@ public interface RStreamAsync extends RExpirableAsync { * @return deleted messages amount */ RFuture removeAsync(StreamMessageId... ids); + + /** + * Trims stream to specified size + * + * @param size - new size of stream + * @return number of deleted messages + */ + RFuture trimAsync(int size); + + /** + * Trims stream to few tens of entries more than specified length to trim. + * + * @param size - new size of stream + * @return number of deleted messages + */ + RFuture trimNonStrictAsync(int size); } diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index 4d80abd8f..f575e3928 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -408,6 +408,7 @@ public interface RedisCommands { RedisStrictCommand XLEN = new RedisStrictCommand("XLEN"); RedisStrictCommand XACK = new RedisStrictCommand("XACK"); RedisStrictCommand XDEL = new RedisStrictCommand("XDEL"); + RedisStrictCommand XTRIM = new RedisStrictCommand("XTRIM"); RedisCommand XPENDING = new RedisCommand("XPENDING", new ListMultiDecoder(new ObjectListReplayDecoder(), new ObjectListReplayDecoder(ListMultiDecoder.RESET), new PendingResultDecoder())); RedisCommand XPENDING_ENTRIES = new RedisCommand("XPENDING",