Feature - Add min idle time parameter to listPending() and pendingRange() methods of RStream object #3341

pull/3355/head
Nikita Koksharov 4 years ago
parent 9e072d58d7
commit e12f8fe270

@ -130,6 +130,16 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.XPENDING_ENTRIES, getName(), groupName, startId, endId, count);
}
@Override
public RFuture<List<PendingEntry>> listPendingAsync(String groupName, StreamMessageId startId, StreamMessageId endId, long idleTime, TimeUnit idleTimeUnit, int count) {
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.XPENDING_ENTRIES, getName(), groupName, "IDLE", idleTimeUnit.toMillis(idleTime), startId, endId, count);
}
@Override
public RFuture<List<PendingEntry>> listPendingAsync(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, long idleTime, TimeUnit idleTimeUnit, int count) {
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.XPENDING_ENTRIES, getName(), groupName, "IDLE", idleTimeUnit.toMillis(idleTime), startId, endId, count, consumerName);
}
@Override
public List<PendingEntry> listPending(String groupName, StreamMessageId startId, StreamMessageId endId, int count) {
return get(listPendingAsync(groupName, startId, endId, count));
@ -140,6 +150,16 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
return get(listPendingAsync(groupName, consumerName, startId, endId, count));
}
@Override
public List<PendingEntry> listPending(String groupName, StreamMessageId startId, StreamMessageId endId, long idleTime, TimeUnit idleTimeUnit, int count) {
return get(listPendingAsync(groupName, startId, endId, idleTime, idleTimeUnit, count));
}
@Override
public List<PendingEntry> listPending(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, long idleTime, TimeUnit idleTimeUnit, int count) {
return get(listPendingAsync(groupName, consumerName, startId, endId, idleTime, idleTimeUnit, count));
}
@Override
public List<StreamMessageId> fastClaim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit,
StreamMessageId... ids) {
@ -1004,10 +1024,50 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
"table.insert(result, value[1]);" +
"end; " +
"return result;",
Collections.<Object>singletonList(getName()),
Collections.singletonList(getName()),
groupName, startId, endId, count, consumerName);
}
@Override
public RFuture<Map<StreamMessageId, Map<K, V>>> pendingRangeAsync(String groupName, StreamMessageId startId,
StreamMessageId endId, long idleTime, TimeUnit idleTimeUnit, int count) {
return commandExecutor.evalReadAsync(getName(), codec, EVAL_XRANGE,
"local pendingData = redis.call('xpending', KEYS[1], ARGV[1], 'IDLE', ARGV[2], ARGV[3], ARGV[4], ARGV[5]);" +
"local result = {}; " +
"for i = 1, #pendingData, 1 do " +
"local value = redis.call('xrange', KEYS[1], pendingData[i][1], pendingData[i][1]);" +
"table.insert(result, value[1]);" +
"end; " +
"return result;",
Collections.singletonList(getName()),
groupName, idleTimeUnit.toMillis(idleTime), startId, endId, count);
}
@Override
public RFuture<Map<StreamMessageId, Map<K, V>>> pendingRangeAsync(String groupName, String consumerName,
StreamMessageId startId, StreamMessageId endId, long idleTime, TimeUnit idleTimeUnit, int count) {
return commandExecutor.evalReadAsync(getName(), codec, EVAL_XRANGE,
"local pendingData = redis.call('xpending', KEYS[1], ARGV[1], 'IDLE', ARGV[2], ARGV[3], ARGV[4], ARGV[5], ARGV[6]);" +
"local result = {}; " +
"for i = 1, #pendingData, 1 do " +
"local value = redis.call('xrange', KEYS[1], pendingData[i][1], pendingData[i][1]);" +
"table.insert(result, value[1]);" +
"end; " +
"return result;",
Collections.singletonList(getName()),
groupName, idleTimeUnit.toMillis(idleTime), startId, endId, count, consumerName);
}
@Override
public Map<StreamMessageId, Map<K, V>> pendingRange(String groupName, StreamMessageId startId, StreamMessageId endId, long idleTime, TimeUnit idleTimeUnit, int count) {
return get(pendingRangeAsync(groupName, startId, endId, idleTime, idleTimeUnit, count));
}
@Override
public Map<StreamMessageId, Map<K, V>> pendingRange(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, long idleTime, TimeUnit idleTimeUnit, int count) {
return get(pendingRangeAsync(groupName, consumerName, startId, endId, idleTime, idleTimeUnit, count));
}
@Override
public Map<StreamMessageId, Map<K, V>> pendingRange(String groupName, String consumerName, StreamMessageId startId,
StreamMessageId endId, int count) {

@ -133,6 +133,49 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
*/
List<PendingEntry> listPending(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, int count);
/**
* Returns stream data of pending messages by group name.
* Limited by minimum idle time, messages count, start and end Stream Message IDs.
* <p>
* {@link StreamMessageId#MAX} is used as max Stream Message ID
* {@link StreamMessageId#MIN} is used as min Stream Message ID
* <p>
* Requires <b>Redis 6.2.0 and higher.</b>
*
* @see #listPendingAsync
*
* @param groupName - name of group
* @param startId - start Stream Message ID
* @param endId - end Stream Message ID
* @param idleTime - minimum idle time of messages
* @param idleTimeUnit - idle time unit
* @param count - amount of messages
* @return map
*/
List<PendingEntry> listPending(String groupName, StreamMessageId startId, StreamMessageId endId, long idleTime, TimeUnit idleTimeUnit, int count);
/**
* Returns stream data of pending messages by group and customer name.
* Limited by minimum idle time, messages count, start and end Stream Message IDs.
* <p>
* {@link StreamMessageId#MAX} is used as max Stream Message ID
* {@link StreamMessageId#MIN} is used as min Stream Message ID
* <p>
* Requires <b>Redis 6.2.0 and higher.</b>
*
* @see #listPendingAsync
*
* @param consumerName - name of consumer
* @param groupName - name of group
* @param startId - start Stream Message ID
* @param endId - end Stream Message ID
* @param idleTime - minimum idle time of messages
* @param idleTimeUnit - idle time unit
* @param count - amount of messages
* @return map
*/
List<PendingEntry> listPending(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, long idleTime, TimeUnit idleTimeUnit, int count);
/**
* Returns stream data of pending messages by group name.
* Limited by start Stream Message ID and end Stream Message ID and count.
@ -168,6 +211,49 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
*/
Map<StreamMessageId, Map<K, V>> pendingRange(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, int count);
/**
* Returns stream data of pending messages by group name.
* Limited by minimum idle time, messages count, start and end Stream Message IDs.
* <p>
* {@link StreamMessageId#MAX} is used as max Stream Message ID
* {@link StreamMessageId#MIN} is used as min Stream Message ID
* <p>
* Requires <b>Redis 6.2.0 and higher.</b>
*
* @see #listPendingAsync
*
* @param groupName - name of group
* @param startId - start Stream Message ID
* @param endId - end Stream Message ID
* @param idleTime - minimum idle time of messages
* @param idleTimeUnit - idle time unit
* @param count - amount of messages
* @return map
*/
Map<StreamMessageId, Map<K, V>> pendingRange(String groupName, StreamMessageId startId, StreamMessageId endId, long idleTime, TimeUnit idleTimeUnit, int count);
/**
* Returns stream data of pending messages by group and customer name.
* Limited by minimum idle time, messages count, start and end Stream Message IDs.
* <p>
* {@link StreamMessageId#MAX} is used as max Stream Message ID
* {@link StreamMessageId#MIN} is used as min Stream Message ID
* <p>
* Requires <b>Redis 6.2.0 and higher.</b>
*
* @see #listPendingAsync
*
* @param consumerName - name of consumer
* @param groupName - name of group
* @param startId - start Stream Message ID
* @param endId - end Stream Message ID
* @param idleTime - minimum idle time of messages
* @param idleTimeUnit - idle time unit
* @param count - amount of messages
* @return map
*/
Map<StreamMessageId, Map<K, V>> pendingRange(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, long idleTime, TimeUnit idleTimeUnit, int count);
/**
* Transfers ownership of pending messages by id to a new consumer
* by name if idle time of messages is greater than defined value.

@ -100,6 +100,49 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
@Deprecated
RFuture<PendingResult> listPendingAsync(String groupName);
/**
* Returns list of common info about pending messages by group name.
* Limited by minimum idle time, messages count, start and end Stream Message IDs.
* <p>
* {@link StreamMessageId#MAX} is used as max Stream Message ID
* {@link StreamMessageId#MIN} is used as min Stream Message ID
* <p>
* Requires <b>Redis 6.2.0 and higher.</b>
*
* @see #pendingRangeAsync
*
* @param groupName - name of group
* @param startId - start Stream Message ID
* @param idleTime - minimum idle time of messages
* @param idleTimeUnit - idle time unit
* @param endId - end Stream Message ID
* @param count - amount of messages
* @return list
*/
RFuture<List<PendingEntry>> listPendingAsync(String groupName, StreamMessageId startId, StreamMessageId endId, long idleTime, TimeUnit idleTimeUnit, int count);
/**
* Returns list of common info about pending messages by group and consumer name.
* Limited by minimum idle time, messages count, start and end Stream Message IDs.
* <p>
* {@link StreamMessageId#MAX} is used as max Stream Message ID
* {@link StreamMessageId#MIN} is used as min Stream Message ID
* <p>
* Requires <b>Redis 6.2.0 and higher.</b>
*
* @see #pendingRangeAsync
*
* @param consumerName - name of consumer
* @param groupName - name of group
* @param startId - start Stream Message ID
* @param endId - end Stream Message ID
* @param idleTime - minimum idle time of messages
* @param idleTimeUnit - idle time unit
* @param count - amount of messages
* @return list
*/
RFuture<List<PendingEntry>> listPendingAsync(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, long idleTime, TimeUnit idleTimeUnit, int count);
/**
* Returns list of common info about pending messages by group name.
* Limited by start Stream Message ID and end Stream Message ID and count.
@ -170,6 +213,49 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
*/
RFuture<Map<StreamMessageId, Map<K, V>>> pendingRangeAsync(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, int count);
/**
* Returns stream data of pending messages by group name.
* Limited by minimum idle time, messages count, start and end Stream Message IDs.
* <p>
* {@link StreamMessageId#MAX} is used as max Stream Message ID
* {@link StreamMessageId#MIN} is used as min Stream Message ID
* <p>
* Requires <b>Redis 6.2.0 and higher.</b>
*
* @see #listPendingAsync
*
* @param groupName - name of group
* @param startId - start Stream Message ID
* @param endId - end Stream Message ID
* @param idleTime - minimum idle time of messages
* @param idleTimeUnit - idle time unit
* @param count - amount of messages
* @return map
*/
RFuture<Map<StreamMessageId, Map<K, V>>> pendingRangeAsync(String groupName, StreamMessageId startId, StreamMessageId endId, long idleTime, TimeUnit idleTimeUnit, int count);
/**
* Returns stream data of pending messages by group and customer name.
* Limited by minimum idle time, messages count, start and end Stream Message IDs.
* <p>
* {@link StreamMessageId#MAX} is used as max Stream Message ID
* {@link StreamMessageId#MIN} is used as min Stream Message ID
* <p>
* Requires <b>Redis 6.2.0 and higher.</b>
*
* @see #listPendingAsync
*
* @param consumerName - name of consumer
* @param groupName - name of group
* @param startId - start Stream Message ID
* @param endId - end Stream Message ID
* @param idleTime - minimum idle time of messages
* @param idleTimeUnit - idle time unit
* @param count - amount of messages
* @return map
*/
RFuture<Map<StreamMessageId, Map<K, V>>> pendingRangeAsync(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, long idleTime, TimeUnit idleTimeUnit, int count);
/**
* Transfers ownership of pending messages by id to a new consumer
* by name if idle time of messages is greater than defined value.

@ -21,6 +21,43 @@ import org.redisson.client.RedisException;
public class RedissonStreamTest extends BaseTest {
@Test
public void testPendingIdle() {
RStream<String, String> stream = redisson.getStream("test");
stream.add("0", "0");
stream.createGroup("testGroup");
StreamMessageId id1 = stream.add("1", "1");
StreamMessageId id2 = stream.add("2", "2");
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
assertThat(s.size()).isEqualTo(2);
StreamMessageId id3 = stream.add("3", "3");
StreamMessageId id4 = stream.add("4", "4");
Map<StreamMessageId, Map<String, String>> s2 = stream.readGroup("testGroup", "consumer2");
assertThat(s2.size()).isEqualTo(2);
List<PendingEntry> list = stream.listPending("testGroup", StreamMessageId.MIN, StreamMessageId.MAX, 1, TimeUnit.MILLISECONDS, 10);
assertThat(list.size()).isEqualTo(4);
for (PendingEntry pendingEntry : list) {
assertThat(pendingEntry.getId()).isIn(id1, id2, id3, id4);
assertThat(pendingEntry.getConsumerName()).isIn("consumer1", "consumer2");
assertThat(pendingEntry.getLastTimeDelivered()).isOne();
}
List<PendingEntry> list2 = stream.listPending("testGroup", "consumer1", StreamMessageId.MIN, StreamMessageId.MAX, 1, TimeUnit.MILLISECONDS,10);
assertThat(list2.size()).isEqualTo(2);
for (PendingEntry pendingEntry : list2) {
assertThat(pendingEntry.getId()).isIn(id1, id2);
assertThat(pendingEntry.getConsumerName()).isEqualTo("consumer1");
assertThat(pendingEntry.getLastTimeDelivered()).isOne();
}
}
@Test
public void testTrim() {
RStream<String, String> stream = redisson.getStream("test");

Loading…
Cancel
Save