RStream.getPendingInfo method added to RStream, RStreamAsync, RStreamRx, RStreamReactive interfaces.

pull/2160/head^2
Nikita Koksharov 6 years ago
parent 1000e0f39a
commit 09c2672ed4

@ -115,14 +115,24 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
@Override
public RFuture<PendingResult> listPendingAsync(String groupName) {
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.XPENDING, getName(), groupName);
return getPendingInfoAsync(groupName);
}
@Override
public PendingResult listPending(String groupName) {
return get(listPendingAsync(groupName));
return getPendingInfo(groupName);
}
@Override
public RFuture<PendingResult> getPendingInfoAsync(String groupName) {
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.XPENDING, getName(), groupName);
}
@Override
public PendingResult getPendingInfo(String groupName) {
return get(listPendingAsync(groupName));
}
@Override
public RFuture<List<PendingEntry>> listPendingAsync(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, int count) {
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.XPENDING_ENTRIES, getName(), groupName, startId, endId, count, consumerName);

@ -83,17 +83,23 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* @return marked messages amount
*/
long ack(String groupName, StreamMessageId... ids);
/**
* Returns pending messages by group name
* Returns common info about pending messages by group name.
*
* @param groupName - name of group
* @return result object
*/
PendingResult getPendingInfo(String groupName);
/*
* Use #getPendingInfo method
*/
@Deprecated
PendingResult listPending(String groupName);
/**
* Returns list of pending messages by group name.
* Returns list of common info about pending messages by group name.
* Limited by start Stream Message ID and end Stream Message ID and count.
* <p>
* {@link StreamMessageId#MAX} is used as max Stream Message ID
@ -108,7 +114,7 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
List<PendingEntry> listPending(String groupName, StreamMessageId startId, StreamMessageId endId, int count);
/**
* Returns list of pending messages by group name and consumer name.
* Returns list of common info about pending messages by group and consumer name.
* Limited by start Stream Message ID and end Stream Message ID and count.
* <p>
* {@link StreamMessageId#MAX} is used as max Stream Message ID
@ -724,14 +730,14 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
StreamInfo<K, V> getInfo();
/**
* Returns list of objects with information about groups belonging to this stream.
* Returns list of common info about groups belonging to this stream.
*
* @return list of info objects
*/
List<StreamGroup> listGroups();
/**
* Returns list of objects with information about group customers for specified <code>groupName</code>.
* Returns list of common info about group customers for specified <code>groupName</code>.
*
* @param groupName - name of group
* @return list of info objects

@ -85,13 +85,19 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @return marked messages amount
*/
RFuture<Long> ackAsync(String groupName, StreamMessageId... ids);
/**
* Returns pending messages by group name
* Returns common info about pending messages by group name.
*
* @param groupName - name of group
* @return result object
*/
RFuture<PendingResult> getPendingInfoAsync(String groupName);
/*
* Use #getPendingInfoAsync method
*/
@Deprecated
RFuture<PendingResult> listPendingAsync(String groupName);
/**

@ -89,11 +89,17 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
Mono<Long> ack(String groupName, StreamMessageId... ids);
/**
* Returns pending messages by group name
* Returns common info about pending messages by group name.
*
* @param groupName - name of group
* @return result object
*/
Mono<PendingResult> getPendingInfo(String groupName);
/*
* Use #getPendingInfo method
*/
@Deprecated
Mono<PendingResult> listPending(String groupName);
/**

@ -90,11 +90,17 @@ public interface RStreamRx<K, V> extends RExpirableRx {
Single<Long> ack(String groupName, StreamMessageId... ids);
/**
* Returns pending messages by group name
* Returns common info about pending messages by group name.
*
* @param groupName - name of group
* @return result object
*/
Single<PendingResult> getPendingInfo(String groupName);
/*
* Use #getPendingInfo method
*/
@Deprecated
Single<PendingResult> listPending(String groupName);
/**

Loading…
Cancel
Save