diff --git a/redisson/src/main/java/org/redisson/RedissonStream.java b/redisson/src/main/java/org/redisson/RedissonStream.java index 99ea4d307..ab92e1a38 100644 --- a/redisson/src/main/java/org/redisson/RedissonStream.java +++ b/redisson/src/main/java/org/redisson/RedissonStream.java @@ -130,6 +130,16 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.XPENDING_ENTRIES, getName(), groupName, startId, endId, count); } + @Override + public RFuture<List<PendingEntry>> listPendingAsync(String groupName, StreamMessageId startId, StreamMessageId endId, long idleTime, TimeUnit idleTimeUnit, int count) { + return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.XPENDING_ENTRIES, getName(), groupName, "IDLE", idleTimeUnit.toMillis(idleTime), startId, endId, count); + } + + @Override + public RFuture<List<PendingEntry>> listPendingAsync(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, long idleTime, TimeUnit idleTimeUnit, int count) { + return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.XPENDING_ENTRIES, getName(), groupName, "IDLE", idleTimeUnit.toMillis(idleTime), startId, endId, count, consumerName); + } + @Override public List<PendingEntry> listPending(String groupName, StreamMessageId startId, StreamMessageId endId, int count) { return get(listPendingAsync(groupName, startId, endId, count)); @@ -140,6 +150,16 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K return get(listPendingAsync(groupName, consumerName, startId, endId, count)); } + @Override + public List<PendingEntry> listPending(String groupName, StreamMessageId startId, StreamMessageId endId, long idleTime, TimeUnit idleTimeUnit, int count) { + return get(listPendingAsync(groupName, startId, endId, idleTime, idleTimeUnit, count)); + } + + @Override + public List<PendingEntry> listPending(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, long idleTime, TimeUnit idleTimeUnit, int count) { + return get(listPendingAsync(groupName, consumerName, startId, endId, idleTime, idleTimeUnit, count)); + } + @Override public List<StreamMessageId> fastClaim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId... ids) { @@ -1004,10 +1024,50 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K "table.insert(result, value[1]);" + "end; " + "return result;", - Collections.<Object>singletonList(getName()), + Collections.singletonList(getName()), groupName, startId, endId, count, consumerName); } + @Override + public RFuture<Map<StreamMessageId, Map<K, V>>> pendingRangeAsync(String groupName, StreamMessageId startId, + StreamMessageId endId, long idleTime, TimeUnit idleTimeUnit, int count) { + return commandExecutor.evalReadAsync(getName(), codec, EVAL_XRANGE, + "local pendingData = redis.call('xpending', KEYS[1], ARGV[1], 'IDLE', ARGV[2], ARGV[3], ARGV[4], ARGV[5]);" + + "local result = {}; " + + "for i = 1, #pendingData, 1 do " + + "local value = redis.call('xrange', KEYS[1], pendingData[i][1], pendingData[i][1]);" + + "table.insert(result, value[1]);" + + "end; " + + "return result;", + Collections.singletonList(getName()), + groupName, idleTimeUnit.toMillis(idleTime), startId, endId, count); + } + + @Override + public RFuture<Map<StreamMessageId, Map<K, V>>> pendingRangeAsync(String groupName, String consumerName, + StreamMessageId startId, StreamMessageId endId, long idleTime, TimeUnit idleTimeUnit, int count) { + return commandExecutor.evalReadAsync(getName(), codec, EVAL_XRANGE, + "local pendingData = redis.call('xpending', KEYS[1], ARGV[1], 'IDLE', ARGV[2], ARGV[3], ARGV[4], ARGV[5], ARGV[6]);" + + "local result = {}; " + + "for i = 1, #pendingData, 1 do " + + "local value = redis.call('xrange', KEYS[1], pendingData[i][1], pendingData[i][1]);" + + "table.insert(result, value[1]);" + + "end; " + + "return result;", + Collections.singletonList(getName()), + groupName, idleTimeUnit.toMillis(idleTime), startId, endId, count, consumerName); + } + + @Override + public Map<StreamMessageId, Map<K, V>> pendingRange(String groupName, StreamMessageId startId, StreamMessageId endId, long idleTime, TimeUnit idleTimeUnit, int count) { + return get(pendingRangeAsync(groupName, startId, endId, idleTime, idleTimeUnit, count)); + } + + @Override + public Map<StreamMessageId, Map<K, V>> pendingRange(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, long idleTime, TimeUnit idleTimeUnit, int count) { + return get(pendingRangeAsync(groupName, consumerName, startId, endId, idleTime, idleTimeUnit, count)); + } + @Override public Map<StreamMessageId, Map<K, V>> pendingRange(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, int count) { diff --git a/redisson/src/main/java/org/redisson/api/RStream.java b/redisson/src/main/java/org/redisson/api/RStream.java index 698677ca7..0273e2417 100644 --- a/redisson/src/main/java/org/redisson/api/RStream.java +++ b/redisson/src/main/java/org/redisson/api/RStream.java @@ -133,6 +133,49 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable { */ List<PendingEntry> listPending(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, int count); + /** + * Returns stream data of pending messages by group name. + * Limited by minimum idle time, messages count, start and end Stream Message IDs. + * <p> + * {@link StreamMessageId#MAX} is used as max Stream Message ID + * {@link StreamMessageId#MIN} is used as min Stream Message ID + * <p> + * Requires <b>Redis 6.2.0 and higher.</b> + * + * @see #listPendingAsync + * + * @param groupName - name of group + * @param startId - start Stream Message ID + * @param endId - end Stream Message ID + * @param idleTime - minimum idle time of messages + * @param idleTimeUnit - idle time unit + * @param count - amount of messages + * @return map + */ + List<PendingEntry> listPending(String groupName, StreamMessageId startId, StreamMessageId endId, long idleTime, TimeUnit idleTimeUnit, int count); + + /** + * Returns stream data of pending messages by group and customer name. + * Limited by minimum idle time, messages count, start and end Stream Message IDs. + * <p> + * {@link StreamMessageId#MAX} is used as max Stream Message ID + * {@link StreamMessageId#MIN} is used as min Stream Message ID + * <p> + * Requires <b>Redis 6.2.0 and higher.</b> + * + * @see #listPendingAsync + * + * @param consumerName - name of consumer + * @param groupName - name of group + * @param startId - start Stream Message ID + * @param endId - end Stream Message ID + * @param idleTime - minimum idle time of messages + * @param idleTimeUnit - idle time unit + * @param count - amount of messages + * @return map + */ + List<PendingEntry> listPending(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, long idleTime, TimeUnit idleTimeUnit, int count); + /** * Returns stream data of pending messages by group name. * Limited by start Stream Message ID and end Stream Message ID and count. @@ -167,7 +210,50 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable { * @return map */ Map<StreamMessageId, Map<K, V>> pendingRange(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, int count); - + + /** + * Returns stream data of pending messages by group name. + * Limited by minimum idle time, messages count, start and end Stream Message IDs. + * <p> + * {@link StreamMessageId#MAX} is used as max Stream Message ID + * {@link StreamMessageId#MIN} is used as min Stream Message ID + * <p> + * Requires <b>Redis 6.2.0 and higher.</b> + * + * @see #listPendingAsync + * + * @param groupName - name of group + * @param startId - start Stream Message ID + * @param endId - end Stream Message ID + * @param idleTime - minimum idle time of messages + * @param idleTimeUnit - idle time unit + * @param count - amount of messages + * @return map + */ + Map<StreamMessageId, Map<K, V>> pendingRange(String groupName, StreamMessageId startId, StreamMessageId endId, long idleTime, TimeUnit idleTimeUnit, int count); + + /** + * Returns stream data of pending messages by group and customer name. + * Limited by minimum idle time, messages count, start and end Stream Message IDs. + * <p> + * {@link StreamMessageId#MAX} is used as max Stream Message ID + * {@link StreamMessageId#MIN} is used as min Stream Message ID + * <p> + * Requires <b>Redis 6.2.0 and higher.</b> + * + * @see #listPendingAsync + * + * @param consumerName - name of consumer + * @param groupName - name of group + * @param startId - start Stream Message ID + * @param endId - end Stream Message ID + * @param idleTime - minimum idle time of messages + * @param idleTimeUnit - idle time unit + * @param count - amount of messages + * @return map + */ + Map<StreamMessageId, Map<K, V>> pendingRange(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, long idleTime, TimeUnit idleTimeUnit, int count); + /** * Transfers ownership of pending messages by id to a new consumer * by name if idle time of messages is greater than defined value. diff --git a/redisson/src/main/java/org/redisson/api/RStreamAsync.java b/redisson/src/main/java/org/redisson/api/RStreamAsync.java index 18acd9ceb..45662515c 100644 --- a/redisson/src/main/java/org/redisson/api/RStreamAsync.java +++ b/redisson/src/main/java/org/redisson/api/RStreamAsync.java @@ -102,13 +102,56 @@ public interface RStreamAsync<K, V> extends RExpirableAsync { /** * Returns list of common info about pending messages by group name. - * Limited by start Stream Message ID and end Stream Message ID and count. + * Limited by minimum idle time, messages count, start and end Stream Message IDs. * <p> * {@link StreamMessageId#MAX} is used as max Stream Message ID * {@link StreamMessageId#MIN} is used as min Stream Message ID + * <p> + * Requires <b>Redis 6.2.0 and higher.</b> + * + * @see #pendingRangeAsync * + * @param groupName - name of group + * @param startId - start Stream Message ID + * @param idleTime - minimum idle time of messages + * @param idleTimeUnit - idle time unit + * @param endId - end Stream Message ID + * @param count - amount of messages + * @return list + */ + RFuture<List<PendingEntry>> listPendingAsync(String groupName, StreamMessageId startId, StreamMessageId endId, long idleTime, TimeUnit idleTimeUnit, int count); + + /** + * Returns list of common info about pending messages by group and consumer name. + * Limited by minimum idle time, messages count, start and end Stream Message IDs. + * <p> + * {@link StreamMessageId#MAX} is used as max Stream Message ID + * {@link StreamMessageId#MIN} is used as min Stream Message ID + * <p> + * Requires <b>Redis 6.2.0 and higher.</b> + * * @see #pendingRangeAsync * + * @param consumerName - name of consumer + * @param groupName - name of group + * @param startId - start Stream Message ID + * @param endId - end Stream Message ID + * @param idleTime - minimum idle time of messages + * @param idleTimeUnit - idle time unit + * @param count - amount of messages + * @return list + */ + RFuture<List<PendingEntry>> listPendingAsync(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, long idleTime, TimeUnit idleTimeUnit, int count); + + /** + * Returns list of common info about pending messages by group name. + * Limited by start Stream Message ID and end Stream Message ID and count. + * <p> + * {@link StreamMessageId#MAX} is used as max Stream Message ID + * {@link StreamMessageId#MIN} is used as min Stream Message ID + * + * @see #pendingRangeAsync + * * @param groupName - name of group * @param startId - start Stream Message ID * @param endId - end Stream Message ID @@ -116,16 +159,16 @@ public interface RStreamAsync<K, V> extends RExpirableAsync { * @return list */ RFuture<List<PendingEntry>> listPendingAsync(String groupName, StreamMessageId startId, StreamMessageId endId, int count); - + /** * Returns list of common info about pending messages by group and consumer name. * Limited by start Stream Message ID and end Stream Message ID and count. * <p> * {@link StreamMessageId#MAX} is used as max Stream Message ID * {@link StreamMessageId#MIN} is used as min Stream Message ID - * + * * @see #pendingRangeAsync - * + * * @param consumerName - name of consumer * @param groupName - name of group * @param startId - start Stream Message ID @@ -170,6 +213,49 @@ public interface RStreamAsync<K, V> extends RExpirableAsync { */ RFuture<Map<StreamMessageId, Map<K, V>>> pendingRangeAsync(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, int count); + /** + * Returns stream data of pending messages by group name. + * Limited by minimum idle time, messages count, start and end Stream Message IDs. + * <p> + * {@link StreamMessageId#MAX} is used as max Stream Message ID + * {@link StreamMessageId#MIN} is used as min Stream Message ID + * <p> + * Requires <b>Redis 6.2.0 and higher.</b> + * + * @see #listPendingAsync + * + * @param groupName - name of group + * @param startId - start Stream Message ID + * @param endId - end Stream Message ID + * @param idleTime - minimum idle time of messages + * @param idleTimeUnit - idle time unit + * @param count - amount of messages + * @return map + */ + RFuture<Map<StreamMessageId, Map<K, V>>> pendingRangeAsync(String groupName, StreamMessageId startId, StreamMessageId endId, long idleTime, TimeUnit idleTimeUnit, int count); + + /** + * Returns stream data of pending messages by group and customer name. + * Limited by minimum idle time, messages count, start and end Stream Message IDs. + * <p> + * {@link StreamMessageId#MAX} is used as max Stream Message ID + * {@link StreamMessageId#MIN} is used as min Stream Message ID + * <p> + * Requires <b>Redis 6.2.0 and higher.</b> + * + * @see #listPendingAsync + * + * @param consumerName - name of consumer + * @param groupName - name of group + * @param startId - start Stream Message ID + * @param endId - end Stream Message ID + * @param idleTime - minimum idle time of messages + * @param idleTimeUnit - idle time unit + * @param count - amount of messages + * @return map + */ + RFuture<Map<StreamMessageId, Map<K, V>>> pendingRangeAsync(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, long idleTime, TimeUnit idleTimeUnit, int count); + /** * Transfers ownership of pending messages by id to a new consumer * by name if idle time of messages is greater than defined value. diff --git a/redisson/src/test/java/org/redisson/RedissonStreamTest.java b/redisson/src/test/java/org/redisson/RedissonStreamTest.java index c9c796c44..f11429f23 100644 --- a/redisson/src/test/java/org/redisson/RedissonStreamTest.java +++ b/redisson/src/test/java/org/redisson/RedissonStreamTest.java @@ -21,6 +21,43 @@ import org.redisson.client.RedisException; public class RedissonStreamTest extends BaseTest { + @Test + public void testPendingIdle() { + RStream<String, String> stream = redisson.getStream("test"); + + stream.add("0", "0"); + + stream.createGroup("testGroup"); + + StreamMessageId id1 = stream.add("1", "1"); + StreamMessageId id2 = stream.add("2", "2"); + + Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1"); + assertThat(s.size()).isEqualTo(2); + + StreamMessageId id3 = stream.add("3", "3"); + StreamMessageId id4 = stream.add("4", "4"); + + Map<StreamMessageId, Map<String, String>> s2 = stream.readGroup("testGroup", "consumer2"); + assertThat(s2.size()).isEqualTo(2); + + List<PendingEntry> list = stream.listPending("testGroup", StreamMessageId.MIN, StreamMessageId.MAX, 1, TimeUnit.MILLISECONDS, 10); + assertThat(list.size()).isEqualTo(4); + for (PendingEntry pendingEntry : list) { + assertThat(pendingEntry.getId()).isIn(id1, id2, id3, id4); + assertThat(pendingEntry.getConsumerName()).isIn("consumer1", "consumer2"); + assertThat(pendingEntry.getLastTimeDelivered()).isOne(); + } + + List<PendingEntry> list2 = stream.listPending("testGroup", "consumer1", StreamMessageId.MIN, StreamMessageId.MAX, 1, TimeUnit.MILLISECONDS,10); + assertThat(list2.size()).isEqualTo(2); + for (PendingEntry pendingEntry : list2) { + assertThat(pendingEntry.getId()).isIn(id1, id2); + assertThat(pendingEntry.getConsumerName()).isEqualTo("consumer1"); + assertThat(pendingEntry.getLastTimeDelivered()).isOne(); + } + } + @Test public void testTrim() { RStream<String, String> stream = redisson.getStream("test");