diff --git a/redisson/src/main/java/org/redisson/RedissonStream.java b/redisson/src/main/java/org/redisson/RedissonStream.java index 2c5e26d56..eb383091d 100644 --- a/redisson/src/main/java/org/redisson/RedissonStream.java +++ b/redisson/src/main/java/org/redisson/RedissonStream.java @@ -27,7 +27,7 @@ import org.redisson.api.PendingEntry; import org.redisson.api.PendingResult; import org.redisson.api.RFuture; import org.redisson.api.RStream; -import org.redisson.api.StreamId; +import org.redisson.api.StreamMessageId; import org.redisson.client.codec.Codec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; @@ -69,25 +69,25 @@ public class RedissonStream extends RedissonExpirable implements RStream createGroupAsync(String groupName) { - return createGroupAsync(groupName, StreamId.NEWEST); + return createGroupAsync(groupName, StreamMessageId.NEWEST); } @Override - public void createGroup(String groupName, StreamId id) { + public void createGroup(String groupName, StreamMessageId id) { get(createGroupAsync(groupName, id)); } @Override - public RFuture createGroupAsync(String groupName, StreamId id) { + public RFuture createGroupAsync(String groupName, StreamMessageId id) { return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XGROUP, "CREATE", getName(), groupName, id); } @Override - public RFuture ackAsync(String groupName, StreamId... ids) { + public RFuture ackAsync(String groupName, StreamMessageId... ids) { List params = new ArrayList(); params.add(getName()); params.add(groupName); - for (StreamId id : ids) { + for (StreamMessageId id : ids) { params.add(id); } @@ -95,7 +95,7 @@ public class RedissonStream extends RedissonExpirable implements RStream extends RedissonExpirable implements RStream> listPendingAsync(String groupName, StreamId startId, StreamId endId, int count, - String consumerName) { + public RFuture> listPendingAsync(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, int count) { return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.XPENDING_ENTRIES, getName(), groupName, startId, endId, count, consumerName); } @Override - public RFuture> listPendingAsync(String groupName, StreamId startId, StreamId endId, int count) { + public RFuture> listPendingAsync(String groupName, StreamMessageId startId, StreamMessageId endId, int count) { return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.XPENDING_ENTRIES, getName(), groupName, startId, endId, count); } @Override - public List listPending(String groupName, StreamId startId, StreamId endId, int count) { + public List listPending(String groupName, StreamMessageId startId, StreamMessageId endId, int count) { return get(listPendingAsync(groupName, startId, endId, count)); } @Override - public List listPending(String groupName, StreamId startId, StreamId endId, int count, - String consumerName) { - return get(listPendingAsync(groupName, startId, endId, count, consumerName)); + public List listPending(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, int count) { + return get(listPendingAsync(groupName, consumerName, startId, endId, count)); } @Override - public RFuture>> claimAsync(String groupName, String consumerName, long idleTime, - TimeUnit idleTimeUnit, StreamId... ids) { + public List fastClaim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, + StreamMessageId... ids) { + return get(fastClaimAsync(groupName, consumerName, idleTime, idleTimeUnit, ids)); + } + + @Override + public RFuture> fastClaimAsync(String groupName, String consumerName, long idleTime, + TimeUnit idleTimeUnit, StreamMessageId... ids) { List params = new ArrayList(); params.add(getName()); params.add(groupName); params.add(consumerName); params.add(idleTimeUnit.toMillis(idleTime)); - for (StreamId id : ids) { + for (StreamMessageId id : ids) { params.add(id.toString()); } - + + params.add("JUSTID"); + + return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.XCLAIM_IDS, params.toArray()); + } + + @Override + public RFuture>> claimAsync(String groupName, String consumerName, long idleTime, + TimeUnit idleTimeUnit, StreamMessageId... ids) { + List params = new ArrayList(); + params.add(getName()); + params.add(groupName); + params.add(consumerName); + params.add(idleTimeUnit.toMillis(idleTime)); + + for (StreamMessageId id : ids) { + params.add(id.toString()); + } + return commandExecutor.readAsync(getName(), codec, RedisCommands.XCLAIM, params.toArray()); } @Override - public Map> claimPending(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, - StreamId... ids) { + public Map> claim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, + StreamMessageId... ids) { return get(claimAsync(groupName, consumerName, idleTime, idleTimeUnit, ids)); } @Override - public RFuture>> readGroupAsync(String groupName, String consumerName, StreamId... ids) { + public RFuture>> readGroupAsync(String groupName, String consumerName, StreamMessageId... ids) { return readGroupAsync(groupName, consumerName, 0, ids); } @Override - public RFuture>> readGroupAsync(String groupName, String consumerName, int count, StreamId... ids) { + public RFuture>> readGroupAsync(String groupName, String consumerName, int count, StreamMessageId... ids) { return readGroupAsync(groupName, consumerName, count, 0, null, ids); } @Override - public RFuture>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit, - StreamId... ids) { + public RFuture>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit, + StreamMessageId... ids) { return readGroupAsync(groupName, consumerName, 0, timeout, unit, ids); } @Override - public RFuture>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit, - StreamId... ids) { + public RFuture>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit, + StreamMessageId... ids) { List params = new ArrayList(); params.add("GROUP"); params.add(groupName); @@ -194,7 +216,7 @@ public class RedissonStream extends RedissonExpirable implements RStream"); } - for (StreamId id : ids) { + for (StreamMessageId id : ids) { params.add(id.toString()); } @@ -205,62 +227,62 @@ public class RedissonStream extends RedissonExpirable implements RStream> readGroup(String groupName, String consumerName, StreamId... ids) { + public Map> readGroup(String groupName, String consumerName, StreamMessageId... ids) { return get(readGroupAsync(groupName, consumerName, ids)); } @Override - public Map> readGroup(String groupName, String consumerName, int count, StreamId... ids) { + public Map> readGroup(String groupName, String consumerName, int count, StreamMessageId... ids) { return get(readGroupAsync(groupName, consumerName, count, ids)); } @Override - public Map> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamId... ids) { + public Map> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId... ids) { return get(readGroupAsync(groupName, consumerName, timeout, unit, ids)); } @Override - public Map> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, - StreamId... ids) { + public Map> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, + StreamMessageId... ids) { return get(readGroupAsync(groupName, consumerName, count, timeout, unit, ids)); } @Override - public StreamId addAll(Map entries) { + public StreamMessageId addAll(Map entries) { return addAll(entries, 0, false); } @Override - public RFuture addAllAsync(Map entries) { + public RFuture addAllAsync(Map entries) { return addAllAsync(entries, 0, false); } @Override - public void addAll(StreamId id, Map entries) { + public void addAll(StreamMessageId id, Map entries) { addAll(id, entries, 0, false); } @Override - public RFuture addAllAsync(StreamId id, Map entries) { + public RFuture addAllAsync(StreamMessageId id, Map entries) { return addAllAsync(id, entries, 0, false); } @Override - public StreamId addAll(Map entries, int trimLen, boolean trimStrict) { + public StreamMessageId addAll(Map entries, int trimLen, boolean trimStrict) { return get(addAllAsync(entries, trimLen, trimStrict)); } @Override - public RFuture addAllAsync(Map entries, int trimLen, boolean trimStrict) { + public RFuture addAllAsync(Map entries, int trimLen, boolean trimStrict) { return addAllCustomAsync(null, entries, trimLen, trimStrict); } @Override - public void addAll(StreamId id, Map entries, int trimLen, boolean trimStrict) { + public void addAll(StreamMessageId id, Map entries, int trimLen, boolean trimStrict) { get(addAllAsync(id, entries, trimLen, trimStrict)); } - private RFuture addAllCustomAsync(StreamId id, Map entries, int trimLen, boolean trimStrict) { + private RFuture addAllCustomAsync(StreamMessageId id, Map entries, int trimLen, boolean trimStrict) { List params = new ArrayList(entries.size()*2 + 1); params.add(getName()); @@ -293,7 +315,7 @@ public class RedissonStream extends RedissonExpirable implements RStream addAllAsync(StreamId id, Map entries, int trimLen, boolean trimStrict) { + public RFuture addAllAsync(StreamMessageId id, Map entries, int trimLen, boolean trimStrict) { return addAllCustomAsync(id, entries, trimLen, trimStrict); } @@ -308,22 +330,22 @@ public class RedissonStream extends RedissonExpirable implements RStream> read(int count, StreamId ... ids) { + public Map> read(int count, StreamMessageId ... ids) { return get(readAsync(count, ids)); } @Override - public RFuture>>> readAsync(int count, StreamId id, Map keyToId) { + public RFuture>>> readAsync(int count, StreamMessageId id, Map keyToId) { return readAsync(count, -1, null, id, keyToId); } @Override - public Map> read(int count, long timeout, TimeUnit unit, StreamId... ids) { + public Map> read(int count, long timeout, TimeUnit unit, StreamMessageId... ids) { return get(readAsync(count, timeout, unit, ids)); } @Override - public RFuture>>> readAsync(int count, long timeout, TimeUnit unit, StreamId id, Map keyToId) { + public RFuture>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id, Map keyToId) { List params = new ArrayList(); if (count > 0) { params.add("COUNT"); @@ -342,7 +364,7 @@ public class RedissonStream extends RedissonExpirable implements RStream extends RedissonExpirable implements RStream addAsync(K key, V value) { + public RFuture addAsync(K key, V value) { return addAsync(key, value, 0, false); } @Override - public RFuture addAsync(StreamId id, K key, V value) { + public RFuture addAsync(StreamMessageId id, K key, V value) { return addAsync(id, key, value, 0, false); } @Override - public RFuture addAsync(K key, V value, int trimLen, boolean trimStrict) { + public RFuture addAsync(K key, V value, int trimLen, boolean trimStrict) { return addCustomAsync(null, key, value, trimLen, trimStrict); } - private RFuture addCustomAsync(StreamId id, K key, V value, int trimLen, boolean trimStrict) { + private RFuture addCustomAsync(StreamMessageId id, K key, V value, int trimLen, boolean trimStrict) { List params = new LinkedList(); params.add(getName()); @@ -398,38 +420,38 @@ public class RedissonStream extends RedissonExpirable implements RStream addAsync(StreamId id, K key, V value, int trimLen, boolean trimStrict) { + public RFuture addAsync(StreamMessageId id, K key, V value, int trimLen, boolean trimStrict) { return addCustomAsync(id, key, value, trimLen, trimStrict); } @Override - public StreamId add(K key, V value) { + public StreamMessageId add(K key, V value) { return get(addAsync(key, value)); } @Override - public void add(StreamId id, K key, V value) { + public void add(StreamMessageId id, K key, V value) { get(addAsync(id, key, value)); } @Override - public StreamId add(K key, V value, int trimLen, boolean trimStrict) { + public StreamMessageId add(K key, V value, int trimLen, boolean trimStrict) { return get(addAsync(key, value, trimLen, trimStrict)); } @Override - public void add(StreamId id, K key, V value, int trimLen, boolean trimStrict) { + public void add(StreamMessageId id, K key, V value, int trimLen, boolean trimStrict) { get(addAsync(id, key, value, trimLen, trimStrict)); } @Override - public RFuture>> readAsync(int count, StreamId... ids) { + public RFuture>> readAsync(int count, StreamMessageId... ids) { return readAsync(count, 0, null, ids); } @Override - public RFuture>> readAsync(int count, long timeout, TimeUnit unit, - StreamId... ids) { + public RFuture>> readAsync(int count, long timeout, TimeUnit unit, + StreamMessageId... ids) { List params = new ArrayList(); if (count > 0) { params.add("COUNT"); @@ -444,7 +466,7 @@ public class RedissonStream extends RedissonExpirable implements RStream extends RedissonExpirable implements RStream>> read(int count, StreamId id, Map keyToId) { + public Map>> read(int count, StreamMessageId id, Map keyToId) { return get(readAsync(count, id, keyToId)); } @Override - public Map>> read(int count, long timeout, TimeUnit unit, StreamId id, Map keyToId) { + public Map>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, Map keyToId) { return get(readAsync(count, timeout, unit, id, keyToId)); } @Override - public RFuture>> rangeAsync(int count, StreamId startId, StreamId endId) { + public RFuture>> rangeAsync(int count, StreamMessageId startId, StreamMessageId endId) { List params = new LinkedList(); params.add(getName()); params.add(startId); @@ -480,12 +502,12 @@ public class RedissonStream extends RedissonExpirable implements RStream> range(int count, StreamId startId, StreamId endId) { + public Map> range(int count, StreamMessageId startId, StreamMessageId endId) { return get(rangeAsync(count, startId, endId)); } @Override - public RFuture>> rangeReversedAsync(int count, StreamId startId, StreamId endId) { + public RFuture>> rangeReversedAsync(int count, StreamMessageId startId, StreamMessageId endId) { List params = new LinkedList(); params.add(getName()); params.add(startId); @@ -500,172 +522,238 @@ public class RedissonStream extends RedissonExpirable implements RStream> rangeReversed(int count, StreamId startId, StreamId endId) { + public Map> rangeReversed(int count, StreamMessageId startId, StreamMessageId endId) { return get(rangeReversedAsync(count, startId, endId)); } @Override - public RFuture>> readAsync(StreamId... ids) { + public RFuture>> readAsync(StreamMessageId... ids) { return readAsync(0, ids); } @Override - public RFuture>> readAsync(long timeout, TimeUnit unit, StreamId... ids) { + public RFuture>> readAsync(long timeout, TimeUnit unit, StreamMessageId... ids) { return readAsync(0, timeout, unit, ids); } @Override - public RFuture>>> readAsync(StreamId id, Map keyToId) { + public RFuture>>> readAsync(StreamMessageId id, Map keyToId) { return readAsync(0, id, keyToId); } @Override - public RFuture>>> readAsync(long timeout, TimeUnit unit, StreamId id, Map keyToId) { + public RFuture>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id, Map keyToId) { return readAsync(0, timeout, unit, id, keyToId); } @Override - public RFuture>> rangeAsync(StreamId startId, StreamId endId) { + public RFuture>> rangeAsync(StreamMessageId startId, StreamMessageId endId) { return rangeAsync(0, startId, endId); } @Override - public RFuture>> rangeReversedAsync(StreamId startId, StreamId endId) { + public RFuture>> rangeReversedAsync(StreamMessageId startId, StreamMessageId endId) { return rangeReversedAsync(0, startId, endId); } @Override - public Map> read(StreamId... ids) { + public Map> read(StreamMessageId... ids) { return read(0, ids); } @Override - public Map> read(long timeout, TimeUnit unit, StreamId... ids) { + public Map> read(long timeout, TimeUnit unit, StreamMessageId... ids) { return read(0, timeout, unit, ids); } @Override - public Map>> read(StreamId id, Map keyToId) { + public Map>> read(StreamMessageId id, Map keyToId) { return read(0, id, keyToId); } @Override - public Map>> read(long timeout, TimeUnit unit, StreamId id, Map keyToId) { + public Map>> read(long timeout, TimeUnit unit, StreamMessageId id, Map keyToId) { return read(0, timeout, unit, id, keyToId); } @Override - public Map> range(StreamId startId, StreamId endId) { + public Map> range(StreamMessageId startId, StreamMessageId endId) { return range(0, startId, endId); } @Override - public Map> rangeReversed(StreamId startId, StreamId endId) { + public Map> rangeReversed(StreamMessageId startId, StreamMessageId endId) { return rangeReversed(0, startId, endId); } @Override - public RFuture>>> readAsync(StreamId id, String key2, StreamId id2) { + public RFuture>>> readAsync(StreamMessageId id, String key2, StreamMessageId id2) { return readAsync(id, Collections.singletonMap(key2, id2)); } @Override - public RFuture>>> readAsync(StreamId id, String key2, StreamId id2, String key3, - StreamId id3) { - Map params = new HashMap(2); + public RFuture>>> readAsync(StreamMessageId id, String key2, StreamMessageId id2, String key3, + StreamMessageId id3) { + Map params = new HashMap(2); params.put(key2, id2); params.put(key3, id3); return readAsync(id, params); } @Override - public RFuture>>> readAsync(int count, StreamId id, String key2, StreamId id2) { + public RFuture>>> readAsync(int count, StreamMessageId id, String key2, StreamMessageId id2) { return readAsync(count, id, Collections.singletonMap(key2, id2)); } @Override - public RFuture>>> readAsync(int count, StreamId id, String key2, StreamId id2, - String key3, StreamId id3) { - Map params = new HashMap(2); + public RFuture>>> readAsync(int count, StreamMessageId id, String key2, StreamMessageId id2, + String key3, StreamMessageId id3) { + Map params = new HashMap(2); params.put(key2, id2); params.put(key3, id3); return readAsync(count, id, params); } @Override - public RFuture>>> readAsync(long timeout, TimeUnit unit, StreamId id, - String key2, StreamId id2) { + public RFuture>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id, + String key2, StreamMessageId id2) { return readAsync(timeout, unit, id, Collections.singletonMap(key2, id2)); } @Override - public RFuture>>> readAsync(long timeout, TimeUnit unit, StreamId id, - String key2, StreamId id2, String key3, StreamId id3) { - Map params = new HashMap(2); + public RFuture>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id, + String key2, StreamMessageId id2, String key3, StreamMessageId id3) { + Map params = new HashMap(2); params.put(key2, id2); params.put(key3, id3); return readAsync(timeout, unit, id, params); } @Override - public RFuture>>> readAsync(int count, long timeout, TimeUnit unit, StreamId id, - String key2, StreamId id2) { + public RFuture>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id, + String key2, StreamMessageId id2) { return readAsync(count, timeout, unit, id, Collections.singletonMap(key2, id2)); } @Override - public RFuture>>> readAsync(int count, long timeout, TimeUnit unit, StreamId id, - String key2, StreamId id2, String key3, StreamId id3) { - Map params = new HashMap(2); + public RFuture>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id, + String key2, StreamMessageId id2, String key3, StreamMessageId id3) { + Map params = new HashMap(2); params.put(key2, id2); params.put(key3, id3); return readAsync(count, timeout, unit, id, params); } @Override - public Map>> read(StreamId id, String key2, StreamId id2) { + public Map>> read(StreamMessageId id, String key2, StreamMessageId id2) { return get(readAsync(id, key2, id2)); } @Override - public Map>> read(StreamId id, String key2, StreamId id2, String key3, - StreamId id3) { + public Map>> read(StreamMessageId id, String key2, StreamMessageId id2, String key3, + StreamMessageId id3) { return get(readAsync(id, key2, id2, key3, id3)); } @Override - public Map>> read(int count, StreamId id, String key2, StreamId id2) { + public Map>> read(int count, StreamMessageId id, String key2, StreamMessageId id2) { return get(readAsync(count, id, key2, id2)); } @Override - public Map>> read(int count, StreamId id, String key2, StreamId id2, String key3, - StreamId id3) { + public Map>> read(int count, StreamMessageId id, String key2, StreamMessageId id2, String key3, + StreamMessageId id3) { return get(readAsync(count, id, key2, id2, key3, id3)); } @Override - public Map>> read(long timeout, TimeUnit unit, StreamId id, String key2, - StreamId id2) { + public Map>> read(long timeout, TimeUnit unit, StreamMessageId id, String key2, + StreamMessageId id2) { return get(readAsync(timeout, unit, id, key2, id2)); } @Override - public Map>> read(long timeout, TimeUnit unit, StreamId id, String key2, - StreamId id2, String key3, StreamId id3) { + public Map>> read(long timeout, TimeUnit unit, StreamMessageId id, String key2, + StreamMessageId id2, String key3, StreamMessageId id3) { return get(readAsync(timeout, unit, id, key2, id2, key3, id3)); } @Override - public Map>> read(int count, long timeout, TimeUnit unit, StreamId id, String key2, - StreamId id2) { + public Map>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String key2, + StreamMessageId id2) { return get(readAsync(count, timeout, unit, id, key2, id2)); } @Override - public Map>> read(int count, long timeout, TimeUnit unit, StreamId id, String key2, - StreamId id2, String key3, StreamId id3) { + public Map>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String key2, + StreamMessageId id2, String key3, StreamMessageId id3) { return get(readAsync(count, timeout, unit, id, key2, id2, key3, id3)); } + @Override + public RFuture removeAsync(StreamMessageId... ids) { + List params = new ArrayList(); + params.add(getName()); + for (StreamMessageId id : ids) { + params.add(id); + } + + return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XDEL, params.toArray()); + } + + @Override + public long remove(StreamMessageId... ids) { + return get(removeAsync(ids)); + } + + @Override + public RFuture trimAsync(int count) { + return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XTRIM, "MAXLEN", count); + } + + @Override + public RFuture trimNonStrictAsync(int count) { + return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XTRIM, "MAXLEN", "~", count); + } + + @Override + public long trim(int count) { + return get(trimAsync(count)); + } + + @Override + public long trimNonStrict(int count) { + return get(trimNonStrictAsync(count)); + } + + @Override + public RFuture removeGroupAsync(String groupName) { + return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XGROUP, "DESTROY", getName(), groupName); + } + + @Override + public void removeGroup(String groupName) { + get(removeGroupAsync(groupName)); + } + + @Override + public RFuture removeConsumerAsync(String groupName, String consumerName) { + return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XGROUP_LONG, "DELCONSUMER", getName(), groupName, consumerName); + } + + @Override + public long removeConsumer(String groupName, String consumerName) { + return get(removeConsumerAsync(groupName, consumerName)); + } + + @Override + public RFuture updateGroupMessageIdAsync(String groupName, StreamMessageId id) { + return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XGROUP, "SETID", getName(), groupName, id); + } + + @Override + public void updateGroupMessageId(String groupName, StreamMessageId id) { + get(updateGroupMessageIdAsync(groupName, id)); + } + } diff --git a/redisson/src/main/java/org/redisson/api/PendingEntry.java b/redisson/src/main/java/org/redisson/api/PendingEntry.java index 16d2e268c..3aa0cfabc 100644 --- a/redisson/src/main/java/org/redisson/api/PendingEntry.java +++ b/redisson/src/main/java/org/redisson/api/PendingEntry.java @@ -23,12 +23,12 @@ package org.redisson.api; */ public class PendingEntry { - private StreamId id; + private StreamMessageId id; private String consumerName; private long idleTime; private long lastTimeDelivered; - public PendingEntry(StreamId id, String consumerName, long idleTime, long lastTimeDelivered) { + public PendingEntry(StreamMessageId id, String consumerName, long idleTime, long lastTimeDelivered) { super(); this.id = id; this.consumerName = consumerName; @@ -41,7 +41,7 @@ public class PendingEntry { * * @return id */ - public StreamId getId() { + public StreamMessageId getId() { return id; } diff --git a/redisson/src/main/java/org/redisson/api/PendingResult.java b/redisson/src/main/java/org/redisson/api/PendingResult.java index 23f593bb6..b9456f97f 100644 --- a/redisson/src/main/java/org/redisson/api/PendingResult.java +++ b/redisson/src/main/java/org/redisson/api/PendingResult.java @@ -29,14 +29,14 @@ public class PendingResult implements Serializable { private static final long serialVersionUID = -5525031552305408248L; private long total; - private StreamId lowestId; - private StreamId highestId; + private StreamMessageId lowestId; + private StreamMessageId highestId; private Map consumerNames; public PendingResult() { } - public PendingResult(long total, StreamId lowestId, StreamId highestId, Map consumerNames) { + public PendingResult(long total, StreamMessageId lowestId, StreamMessageId highestId, Map consumerNames) { super(); this.total = total; this.lowestId = lowestId; @@ -58,7 +58,7 @@ public class PendingResult implements Serializable { * * @return number */ - public StreamId getLowestId() { + public StreamMessageId getLowestId() { return lowestId; } @@ -67,7 +67,7 @@ public class PendingResult implements Serializable { * * @return number */ - public StreamId getHighestId() { + public StreamMessageId getHighestId() { return highestId; } diff --git a/redisson/src/main/java/org/redisson/api/RStream.java b/redisson/src/main/java/org/redisson/api/RStream.java index 79d1e4739..27dc0ff5c 100644 --- a/redisson/src/main/java/org/redisson/api/RStream.java +++ b/redisson/src/main/java/org/redisson/api/RStream.java @@ -39,24 +39,48 @@ public interface RStream extends RStreamAsync, RExpirable { void createGroup(String groupName); /** - * Creates consumer group by name and stream id. + * Creates consumer group by name and Stream Message ID. * Only new messages after defined stream id will be available for consumers of this group. *

- * {@link StreamId#NEWEST} is used for messages arrived since the moment of group creating + * {@link StreamMessageId#NEWEST} is used for messages arrived since the moment of group creating * * @param groupName - name of group - * @param id - stream id + * @param id - Stream Message ID */ - void createGroup(String groupName, StreamId id); + void createGroup(String groupName, StreamMessageId id); + + /** + * Removes group by name. + * + * @param groupName - name of group + */ + void removeGroup(String groupName); + + /** + * Removes consumer of the group by name. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @return number of pending messages owned by consumer + */ + long removeConsumer(String groupName, String consumerName); + + /** + * Updates next message id delivered to consumers. + * + * @param groupName - name of group + * @param id - Stream Message ID + */ + void updateGroupMessageId(String groupName, StreamMessageId id); /** * Marks pending messages by group name and stream ids as correctly processed. * * @param groupName - name of group - * @param ids - stream ids + * @param ids - Stream Message IDs * @return marked messages amount */ - Long ack(String groupName, StreamId... ids); + long ack(String groupName, StreamMessageId... ids); /** * Returns pending messages by group name @@ -68,34 +92,47 @@ public interface RStream extends RStreamAsync, RExpirable { /** * Returns list of pending messages by group name. - * Limited by start stream id and end stream id and count. + * Limited by start Stream Message ID and end Stream Message ID and count. *

- * {@link StreamId#MAX} is used as max stream id - * {@link StreamId#MIN} is used as min stream id + * {@link StreamMessageId#MAX} is used as max Stream Message ID + * {@link StreamMessageId#MIN} is used as min Stream Message ID * * @param groupName - name of group - * @param startId - start stream id - * @param endId - end stream id + * @param startId - start Stream Message ID + * @param endId - end Stream Message ID * @param count - amount of messages * @return list */ - List listPending(String groupName, StreamId startId, StreamId endId, int count); + List listPending(String groupName, StreamMessageId startId, StreamMessageId endId, int count); /** * Returns list of pending messages by group name and consumer name. - * Limited by start stream id and end stream id and count. + * Limited by start Stream Message ID and end Stream Message ID and count. *

- * {@link StreamId#MAX} is used as max stream id - * {@link StreamId#MIN} is used as min stream id + * {@link StreamMessageId#MAX} is used as max Stream Message ID + * {@link StreamMessageId#MIN} is used as min Stream Message ID * * @param consumerName - name of consumer * @param groupName - name of group - * @param startId - start stream id - * @param endId - end stream id + * @param startId - start Stream Message ID + * @param endId - end Stream Message ID * @param count - amount of messages * @return list */ - List listPending(String groupName, StreamId startId, StreamId endId, int count, String consumerName); + List listPending(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, int count); + + /** + * Transfers ownership of pending messages by id to a new consumer + * by name if idle time of messages is greater than defined value. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @param idleTime - minimum idle time of messages + * @param idleTimeUnit - idle time unit + * @param ids - Stream Message IDs + * @return stream data mapped by Stream Message ID + */ + Map> claim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId ... ids); /** * Transfers ownership of pending messages by id to a new consumer @@ -105,47 +142,47 @@ public interface RStream extends RStreamAsync, RExpirable { * @param consumerName - name of consumer * @param idleTime - minimum idle time of messages * @param idleTimeUnit - idle time unit - * @param ids - stream ids - * @return stream data mapped by Stream ID + * @param ids - Stream Message IDs + * @return list of Stream Message IDs */ - Map> claimPending(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamId ... ids); + List fastClaim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId ... ids); /** - * Read stream data from groupName by consumerName and specified collection of Stream IDs. + * Read stream data from groupName by consumerName and specified collection of Stream Message IDs. * * @param groupName - name of group * @param consumerName - name of consumer - * @param ids - collection of Stream IDs - * @return stream data mapped by Stream ID + * @param ids - collection of Stream Message IDs + * @return stream data mapped by Stream Message ID */ - Map> readGroup(String groupName, String consumerName, StreamId ... ids); + Map> readGroup(String groupName, String consumerName, StreamMessageId ... ids); /** - * Read stream data from groupName by consumerName and specified collection of Stream IDs. + * Read stream data from groupName by consumerName and specified collection of Stream Message IDs. * * @param groupName - name of group * @param consumerName - name of consumer * @param count - stream data size limit - * @param ids - collection of Stream IDs - * @return stream data mapped by Stream ID + * @param ids - collection of Stream Message IDs + * @return stream data mapped by Stream Message ID */ - Map> readGroup(String groupName, String consumerName, int count, StreamId ... ids); + Map> readGroup(String groupName, String consumerName, int count, StreamMessageId ... ids); /** - * Read stream data from groupName by consumerName and specified collection of Stream IDs. + * Read stream data from groupName by consumerName and specified collection of Stream Message IDs. * Wait for stream data availability for specified timeout interval. * * @param groupName - name of group * @param consumerName - name of consumer * @param timeout - time interval to wait for stream data availability * @param unit - time interval unit - * @param ids - collection of Stream IDs - * @return stream data mapped by Stream ID + * @param ids - collection of Stream Message IDs + * @return stream data mapped by Stream Message ID */ - Map> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamId ... ids); + Map> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId ... ids); /** - * Read stream data from groupName by consumerName and specified collection of Stream IDs. + * Read stream data from groupName by consumerName and specified collection of Stream Message IDs. * Wait for stream data availability for specified timeout interval. * * @param groupName - name of group @@ -153,10 +190,10 @@ public interface RStream extends RStreamAsync, RExpirable { * @param count - stream data size limit * @param timeout - time interval to wait for stream data availability * @param unit - time interval unit - * @param ids - collection of Stream IDs - * @return stream data mapped by Stream ID + * @param ids - collection of Stream Message IDs + * @return stream data mapped by Stream Message ID */ - Map> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamId ... ids); + Map> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId ... ids); /** * Returns number of entries in stream @@ -166,25 +203,25 @@ public interface RStream extends RStreamAsync, RExpirable { long size(); /** - * Appends a new entry and returns generated Stream ID + * Appends a new entry and returns generated Stream Message ID * * @param key - key of entry * @param value - value of entry - * @return Stream ID + * @return Stream Message ID */ - StreamId add(K key, V value); + StreamMessageId add(K key, V value); /** - * Appends a new entry by specified Stream ID + * Appends a new entry by specified Stream Message ID * - * @param id - Stream ID + * @param id - Stream Message ID * @param key - key of entry * @param value - value of entry */ - void add(StreamId id, K key, V value); + void add(StreamMessageId id, K key, V value); /** - * Appends a new entry and returns generated Stream ID. + * Appends a new entry and returns generated Stream Message ID. * Trims stream to a specified trimLen size. * If trimStrict is false then trims to few tens of entries more than specified length to trim. * @@ -192,102 +229,102 @@ public interface RStream extends RStreamAsync, RExpirable { * @param value - value of entry * @param trimLen - length to trim * @param trimStrict - if false then trims to few tens of entries more than specified length to trim - * @return Stream ID + * @return Stream Message ID */ - StreamId add(K key, V value, int trimLen, boolean trimStrict); + StreamMessageId add(K key, V value, int trimLen, boolean trimStrict); /** - * Appends a new entry by specified Stream ID. + * Appends a new entry by specified Stream Message ID. * Trims stream to a specified trimLen size. * If trimStrict is false then trims to few tens of entries more than specified length to trim. * - * @param id - Stream ID + * @param id - Stream Message ID * @param key - key of entry * @param value - value of entry * @param trimLen - length to trim * @param trimStrict - if false then trims to few tens of entries more than specified length to trim */ - void add(StreamId id, K key, V value, int trimLen, boolean trimStrict); + void add(StreamMessageId id, K key, V value, int trimLen, boolean trimStrict); /** - * Appends new entries and returns generated Stream ID + * Appends new entries and returns generated Stream Message ID * * @param entries - entries to add - * @return Stream ID + * @return Stream Message ID */ - StreamId addAll(Map entries); + StreamMessageId addAll(Map entries); /** - * Appends new entries by specified Stream ID + * Appends new entries by specified Stream Message ID * - * @param id - Stream ID + * @param id - Stream Message ID * @param entries - entries to add */ - void addAll(StreamId id, Map entries); + void addAll(StreamMessageId id, Map entries); /** - * Appends new entries and returns generated Stream ID. + * Appends new entries and returns generated Stream Message ID. * Trims stream to a specified trimLen size. * If trimStrict is false then trims to few tens of entries more than specified length to trim. * * @param entries - entries to add * @param trimLen - length to trim * @param trimStrict - if false then trims to few tens of entries more than specified length to trim - * @return Stream ID + * @return Stream Message ID */ - StreamId addAll(Map entries, int trimLen, boolean trimStrict); + StreamMessageId addAll(Map entries, int trimLen, boolean trimStrict); /** - * Appends new entries by specified Stream ID. + * Appends new entries by specified Stream Message ID. * Trims stream to a specified trimLen size. * If trimStrict is false then trims to few tens of entries more than specified length to trim. * - * @param id - Stream ID + * @param id - Stream Message ID * @param entries - entries to add * @param trimLen - length to trim * @param trimStrict - if false then trims to few tens of entries more than specified length to trim */ - void addAll(StreamId id, Map entries, int trimLen, boolean trimStrict); + void addAll(StreamMessageId id, Map entries, int trimLen, boolean trimStrict); /** - * Read stream data by specified collection of Stream IDs. + * Read stream data by specified collection of Stream Message IDs. * - * @param ids - collection of Stream IDs - * @return stream data mapped by Stream ID + * @param ids - collection of Stream Message IDs + * @return stream data mapped by Stream Message ID */ - Map> read(StreamId ... ids); + Map> read(StreamMessageId ... ids); /** - * Read stream data by specified collection of Stream IDs. + * Read stream data by specified collection of Stream Message IDs. * * @param count - stream data size limit - * @param ids - collection of Stream IDs - * @return stream data mapped by Stream ID + * @param ids - collection of Stream Message IDs + * @return stream data mapped by Stream Message ID */ - Map> read(int count, StreamId ... ids); + Map> read(int count, StreamMessageId ... ids); /** - * Read stream data by specified collection of Stream IDs. + * Read stream data by specified collection of Stream Message IDs. * Wait for stream data availability for specified timeout interval. * * @param timeout - time interval to wait for stream data availability * @param unit - time interval unit - * @param ids - collection of Stream IDs - * @return stream data mapped by Stream ID + * @param ids - collection of Stream Message IDs + * @return stream data mapped by Stream Message ID */ - Map> read(long timeout, TimeUnit unit, StreamId ... ids); + Map> read(long timeout, TimeUnit unit, StreamMessageId ... ids); /** - * Read stream data by specified collection of Stream IDs. + * Read stream data by specified collection of Stream Message IDs. * Wait for stream data availability for specified timeout interval. * * @param count - stream data size limit * @param timeout - time interval to wait for stream data availability * @param unit - time interval unit - * @param ids - collection of Stream IDs - * @return stream data mapped by Stream ID + * @param ids - collection of Stream Message IDs + * @return stream data mapped by Stream Message ID */ - Map> read(int count, long timeout, TimeUnit unit, StreamId ... ids); + Map> read(int count, long timeout, TimeUnit unit, StreamMessageId ... ids); /** * Read stream data by specified stream name including this stream. @@ -295,9 +332,9 @@ public interface RStream extends RStreamAsync, RExpirable { * @param id - id of this stream * @param name2 - name of second stream * @param id2 - id of second stream - * @return stream data mapped by key and Stream ID + * @return stream data mapped by key and Stream Message ID */ - Map>> read(StreamId id, String name2, StreamId id2); + Map>> read(StreamMessageId id, String name2, StreamMessageId id2); /** * Read stream data by specified stream names including this stream. @@ -307,18 +344,18 @@ public interface RStream extends RStreamAsync, RExpirable { * @param id2 - id of second stream * @param name3 - name of third stream * @param id3 - id of third stream - * @return stream data mapped by key and Stream ID + * @return stream data mapped by key and Stream Message ID */ - Map>> read(StreamId id, String name2, StreamId id2, String name3, StreamId id3); + Map>> read(StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3); /** - * Read stream data by specified stream id mapped by name including this stream. + * Read stream data by specified Stream Message ID mapped by name including this stream. * * @param id - id of this stream - * @param nameToId - stream id mapped by name - * @return stream data mapped by key and Stream ID + * @param nameToId - Stream Message ID mapped by name + * @return stream data mapped by key and Stream Message ID */ - Map>> read(StreamId id, Map nameToId); + Map>> read(StreamMessageId id, Map nameToId); /** * Read stream data by specified stream name including this stream. @@ -327,9 +364,9 @@ public interface RStream extends RStreamAsync, RExpirable { * @param id - id of this stream * @param name2 - name of second stream * @param id2 - id of second stream - * @return stream data mapped by key and Stream ID + * @return stream data mapped by key and Stream Message ID */ - Map>> read(int count, StreamId id, String name2, StreamId id2); + Map>> read(int count, StreamMessageId id, String name2, StreamMessageId id2); /** * Read stream data by specified stream names including this stream. @@ -340,19 +377,19 @@ public interface RStream extends RStreamAsync, RExpirable { * @param id2 - id of second stream * @param name3 - name of third stream * @param id3 - id of third stream - * @return stream data mapped by key and Stream ID + * @return stream data mapped by key and Stream Message ID */ - Map>> read(int count, StreamId id, String name2, StreamId id2, String name3, StreamId id3); + Map>> read(int count, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3); /** - * Read stream data by specified stream id mapped by name including this stream. + * Read stream data by specified Stream Message ID mapped by name including this stream. * * @param count - stream data size limit * @param id - id of this stream - * @param nameToId - stream id mapped by name - * @return stream data mapped by key and Stream ID + * @param nameToId - Stream Message ID mapped by name + * @return stream data mapped by key and Stream Message ID */ - Map>> read(int count, StreamId id, Map nameToId); + Map>> read(int count, StreamMessageId id, Map nameToId); /** * Read stream data by specified stream name including this stream. @@ -363,9 +400,9 @@ public interface RStream extends RStreamAsync, RExpirable { * @param id - id of this stream * @param name2 - name of second stream * @param id2 - id of second stream - * @return stream data mapped by key and Stream ID + * @return stream data mapped by key and Stream Message ID */ - Map>> read(long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2); + Map>> read(long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2); /** * Read stream data by specified stream names including this stream. @@ -378,21 +415,21 @@ public interface RStream extends RStreamAsync, RExpirable { * @param id2 - id of second stream * @param name3 - name of third stream * @param id3 - id of third stream - * @return stream data mapped by key and Stream ID + * @return stream data mapped by key and Stream Message ID */ - Map>> read(long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2, String name3, StreamId id3); + Map>> read(long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3); /** - * Read stream data by specified stream id mapped by name including this stream. + * Read stream data by specified Stream Message ID mapped by name including this stream. * Wait for the first stream data availability for specified timeout interval. * * @param timeout - time interval to wait for stream data availability * @param unit - time interval unit * @param id - id of this stream - * @param nameToId - stream id mapped by name - * @return stream data mapped by key and Stream ID + * @param nameToId - Stream Message ID mapped by name + * @return stream data mapped by key and Stream Message ID */ - Map>> read(long timeout, TimeUnit unit, StreamId id, Map nameToId); + Map>> read(long timeout, TimeUnit unit, StreamMessageId id, Map nameToId); /** * Read stream data by specified stream name including this stream. @@ -404,9 +441,9 @@ public interface RStream extends RStreamAsync, RExpirable { * @param id - id of this stream * @param name2 - name of second stream * @param id2 - id of second stream - * @return stream data mapped by key and Stream ID + * @return stream data mapped by key and Stream Message ID */ - Map>> read(int count, long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2); + Map>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2); /** * Read stream data by specified stream names including this stream. @@ -420,59 +457,83 @@ public interface RStream extends RStreamAsync, RExpirable { * @param id2 - id of second stream * @param name3 - name of third stream * @param id3 - id of third stream - * @return stream data mapped by key and Stream ID + * @return stream data mapped by key and Stream Message ID */ - Map>> read(int count, long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2, String name3, StreamId id3); + Map>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3); /** - * Read stream data by specified stream id mapped by name including this stream. + * Read stream data by specified Stream Message ID mapped by name including this stream. * Wait for the first stream data availability for specified timeout interval. * * @param count - stream data size limit * @param timeout - time interval to wait for stream data availability * @param unit - time interval unit * @param id - id of this stream - * @param nameToId - stream id mapped by name - * @return stream data mapped by key and Stream ID + * @param nameToId - Stream Message ID mapped by name + * @return stream data mapped by key and Stream Message ID */ - Map>> read(int count, long timeout, TimeUnit unit, StreamId id, Map nameToId); + Map>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, Map nameToId); /** - * Read stream data in range by specified start Stream ID (included) and end Stream ID (included). + * Read stream data in range by specified start Stream Message ID (included) and end Stream Message ID (included). * - * @param startId - start Stream ID - * @param endId - end Stream ID - * @return stream data mapped by Stream ID + * @param startId - start Stream Message ID + * @param endId - end Stream Message ID + * @return stream data mapped by Stream Message ID */ - Map> range(StreamId startId, StreamId endId); + Map> range(StreamMessageId startId, StreamMessageId endId); /** - * Read stream data in range by specified start Stream ID (included) and end Stream ID (included). + * Read stream data in range by specified start Stream Message ID (included) and end Stream Message ID (included). * * @param count - stream data size limit - * @param startId - start Stream ID - * @param endId - end Stream ID - * @return stream data mapped by Stream ID + * @param startId - start Stream Message ID + * @param endId - end Stream Message ID + * @return stream data mapped by Stream Message ID */ - Map> range(int count, StreamId startId, StreamId endId); + Map> range(int count, StreamMessageId startId, StreamMessageId endId); /** - * Read stream data in reverse order in range by specified start Stream ID (included) and end Stream ID (included). + * Read stream data in reverse order in range by specified start Stream Message ID (included) and end Stream Message ID (included). * - * @param startId - start Stream ID - * @param endId - end Stream ID - * @return stream data mapped by Stream ID + * @param startId - start Stream Message ID + * @param endId - end Stream Message ID + * @return stream data mapped by Stream Message ID */ - Map> rangeReversed(StreamId startId, StreamId endId); + Map> rangeReversed(StreamMessageId startId, StreamMessageId endId); /** - * Read stream data in reverse order in range by specified start Stream ID (included) and end Stream ID (included). + * Read stream data in reverse order in range by specified start Stream Message ID (included) and end Stream Message ID (included). * * @param count - stream data size limit - * @param startId - start Stream ID - * @param endId - end Stream ID - * @return stream data mapped by Stream ID + * @param startId - start Stream Message ID + * @param endId - end Stream Message ID + * @return stream data mapped by Stream Message ID + */ + Map> rangeReversed(int count, StreamMessageId startId, StreamMessageId endId); + + /** + * Removes messages by id. + * + * @param ids - id of messages to remove + * @return deleted messages amount + */ + long remove(StreamMessageId... ids); + + /** + * Trims stream to specified size + * + * @param size - new size of stream + * @return number of deleted messages + */ + long trim(int size); + + /** + * Trims stream to few tens of entries more than specified length to trim. + * + * @param size - new size of stream + * @return number of deleted messages */ - Map> rangeReversed(int count, StreamId startId, StreamId endId); + long trimNonStrict(int size); } diff --git a/redisson/src/main/java/org/redisson/api/RStreamAsync.java b/redisson/src/main/java/org/redisson/api/RStreamAsync.java index 92e64bc43..fa85a3f6c 100644 --- a/redisson/src/main/java/org/redisson/api/RStreamAsync.java +++ b/redisson/src/main/java/org/redisson/api/RStreamAsync.java @@ -43,13 +43,39 @@ public interface RStreamAsync extends RExpirableAsync { * Creates consumer group by name and stream id. * Only new messages after defined stream id will be available for consumers of this group. *

- * {@link StreamId#NEWEST} is used for messages arrived since the moment of group creating + * {@link StreamMessageId#NEWEST} is used for messages arrived since the moment of group creating * * @param groupName - name of group * @param id - stream id * @return void */ - RFuture createGroupAsync(String groupName, StreamId id); + RFuture createGroupAsync(String groupName, StreamMessageId id); + + /** + * Removes group by name. + * + * @param groupName - name of group + * @return void + */ + RFuture removeGroupAsync(String groupName); + + /** + * Removes consumer of the group by name. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @return number of pending messages owned by consumer + */ + RFuture removeConsumerAsync(String groupName, String consumerName); + + /** + * Updates next message id delivered to consumers. + * + * @param groupName - name of group + * @param id - Stream Message ID + * @return void + */ + RFuture updateGroupMessageIdAsync(String groupName, StreamMessageId id); /** * Marks pending messages by group name and stream ids as correctly processed. @@ -58,7 +84,7 @@ public interface RStreamAsync extends RExpirableAsync { * @param ids - stream ids * @return marked messages amount */ - RFuture ackAsync(String groupName, StreamId... ids); + RFuture ackAsync(String groupName, StreamMessageId... ids); /** * Returns pending messages by group name @@ -72,8 +98,8 @@ public interface RStreamAsync extends RExpirableAsync { * Returns list of pending messages by group name. * Limited by start stream id and end stream id and count. *

- * {@link StreamId#MAX} is used as max stream id - * {@link StreamId#MIN} is used as min stream id + * {@link StreamMessageId#MAX} is used as max stream id + * {@link StreamMessageId#MIN} is used as min stream id * * @param groupName - name of group * @param startId - start stream id @@ -81,14 +107,14 @@ public interface RStreamAsync extends RExpirableAsync { * @param count - amount of messages * @return list */ - RFuture> listPendingAsync(String groupName, StreamId startId, StreamId endId, int count); + RFuture> listPendingAsync(String groupName, StreamMessageId startId, StreamMessageId endId, int count); /** * Returns list of pending messages by group name and consumer name. * Limited by start stream id and end stream id and count. *

- * {@link StreamId#MAX} is used as max stream id - * {@link StreamId#MIN} is used as min stream id + * {@link StreamMessageId#MAX} is used as max stream id + * {@link StreamMessageId#MIN} is used as min stream id * * @param consumerName - name of consumer * @param groupName - name of group @@ -97,7 +123,7 @@ public interface RStreamAsync extends RExpirableAsync { * @param count - amount of messages * @return list */ - RFuture> listPendingAsync(String groupName, StreamId startId, StreamId endId, int count, String consumerName); + RFuture> listPendingAsync(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, int count); /** * Transfers ownership of pending messages by id to a new consumer @@ -110,7 +136,20 @@ public interface RStreamAsync extends RExpirableAsync { * @param ids - stream ids * @return stream data mapped by Stream ID */ - RFuture>> claimAsync(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamId ... ids); + RFuture>> claimAsync(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId ... ids); + + /** + * Transfers ownership of pending messages by id to a new consumer + * by name if idle time of messages is greater than defined value. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @param idleTime - minimum idle time of messages + * @param idleTimeUnit - idle time unit + * @param ids - Stream Message IDs + * @return list of Stream Message IDs + */ + RFuture> fastClaimAsync(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId ... ids); /** * Read stream data from groupName by consumerName and specified collection of Stream IDs. @@ -120,7 +159,7 @@ public interface RStreamAsync extends RExpirableAsync { * @param ids - collection of Stream IDs * @return stream data mapped by Stream ID */ - RFuture>> readGroupAsync(String groupName, String consumerName, StreamId ... ids); + RFuture>> readGroupAsync(String groupName, String consumerName, StreamMessageId ... ids); /** * Read stream data from groupName by consumerName and specified collection of Stream IDs. @@ -131,7 +170,7 @@ public interface RStreamAsync extends RExpirableAsync { * @param ids - collection of Stream IDs * @return stream data mapped by Stream ID */ - RFuture>> readGroupAsync(String groupName, String consumerName, int count, StreamId ... ids); + RFuture>> readGroupAsync(String groupName, String consumerName, int count, StreamMessageId ... ids); /** * Read stream data from groupName by consumerName and specified collection of Stream IDs. @@ -144,7 +183,7 @@ public interface RStreamAsync extends RExpirableAsync { * @param ids - collection of Stream IDs * @return stream data mapped by Stream ID */ - RFuture>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit, StreamId ... ids); + RFuture>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId ... ids); /** * Read stream data from groupName by consumerName and specified collection of Stream IDs. @@ -158,7 +197,7 @@ public interface RStreamAsync extends RExpirableAsync { * @param ids - collection of Stream IDs * @return stream data mapped by Stream ID */ - RFuture>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamId ... ids); + RFuture>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId ... ids); /** * Returns number of entries in stream @@ -174,7 +213,7 @@ public interface RStreamAsync extends RExpirableAsync { * @param value - value of entry * @return Stream ID */ - RFuture addAsync(K key, V value); + RFuture addAsync(K key, V value); /** * Appends a new entry by specified Stream ID @@ -184,7 +223,7 @@ public interface RStreamAsync extends RExpirableAsync { * @param value - value of entry * @return void */ - RFuture addAsync(StreamId id, K key, V value); + RFuture addAsync(StreamMessageId id, K key, V value); /** * Appends a new entry and returns generated Stream ID. @@ -197,7 +236,7 @@ public interface RStreamAsync extends RExpirableAsync { * @param trimStrict - if false then trims to few tens of entries more than specified length to trim * @return Stream ID */ - RFuture addAsync(K key, V value, int trimLen, boolean trimStrict); + RFuture addAsync(K key, V value, int trimLen, boolean trimStrict); /** * Appends a new entry by specified Stream ID. @@ -211,7 +250,7 @@ public interface RStreamAsync extends RExpirableAsync { * @param trimStrict - if false then trims to few tens of entries more than specified length to trim * @return void */ - RFuture addAsync(StreamId id, K key, V value, int trimLen, boolean trimStrict); + RFuture addAsync(StreamMessageId id, K key, V value, int trimLen, boolean trimStrict); /** * Appends new entries and returns generated Stream ID @@ -219,7 +258,7 @@ public interface RStreamAsync extends RExpirableAsync { * @param entries - entries to add * @return Stream ID */ - RFuture addAllAsync(Map entries); + RFuture addAllAsync(Map entries); /** * Appends new entries by specified Stream ID @@ -228,7 +267,7 @@ public interface RStreamAsync extends RExpirableAsync { * @param entries - entries to add * @return void */ - RFuture addAllAsync(StreamId id, Map entries); + RFuture addAllAsync(StreamMessageId id, Map entries); /** * Appends new entries and returns generated Stream ID. @@ -240,7 +279,7 @@ public interface RStreamAsync extends RExpirableAsync { * @param trimStrict - if false then trims to few tens of entries more than specified length to trim * @return Stream ID */ - RFuture addAllAsync(Map entries, int trimLen, boolean trimStrict); + RFuture addAllAsync(Map entries, int trimLen, boolean trimStrict); /** * Appends new entries by specified Stream ID. @@ -253,7 +292,7 @@ public interface RStreamAsync extends RExpirableAsync { * @param trimStrict - if false then trims to few tens of entries more than specified length to trim * @return void */ - RFuture addAllAsync(StreamId id, Map entries, int trimLen, boolean trimStrict); + RFuture addAllAsync(StreamMessageId id, Map entries, int trimLen, boolean trimStrict); /** * Read stream data by specified collection of Stream IDs. @@ -261,7 +300,7 @@ public interface RStreamAsync extends RExpirableAsync { * @param ids - collection of Stream IDs * @return stream data mapped by Stream ID */ - RFuture>> readAsync(StreamId ... ids); + RFuture>> readAsync(StreamMessageId ... ids); /** * Read stream data by specified collection of Stream IDs. @@ -270,7 +309,7 @@ public interface RStreamAsync extends RExpirableAsync { * @param ids - collection of Stream IDs * @return stream data mapped by Stream ID */ - RFuture>> readAsync(int count, StreamId ... ids); + RFuture>> readAsync(int count, StreamMessageId ... ids); /** * Read stream data by specified collection of Stream IDs. @@ -281,7 +320,7 @@ public interface RStreamAsync extends RExpirableAsync { * @param ids - collection of Stream IDs * @return stream data mapped by Stream ID */ - RFuture>> readAsync(long timeout, TimeUnit unit, StreamId ... ids); + RFuture>> readAsync(long timeout, TimeUnit unit, StreamMessageId ... ids); /** * Read stream data by specified collection of Stream IDs. @@ -293,7 +332,7 @@ public interface RStreamAsync extends RExpirableAsync { * @param ids - collection of Stream IDs * @return stream data mapped by Stream ID */ - RFuture>> readAsync(int count, long timeout, TimeUnit unit, StreamId ... ids); + RFuture>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId ... ids); /** * Read stream data by specified stream name including this stream. @@ -303,7 +342,7 @@ public interface RStreamAsync extends RExpirableAsync { * @param id2 - id of second stream * @return stream data mapped by key and Stream ID */ - RFuture>>> readAsync(StreamId id, String name2, StreamId id2); + RFuture>>> readAsync(StreamMessageId id, String name2, StreamMessageId id2); /** * Read stream data by specified stream names including this stream. @@ -315,7 +354,7 @@ public interface RStreamAsync extends RExpirableAsync { * @param id3 - id of third stream * @return stream data mapped by key and Stream ID */ - RFuture>>> readAsync(StreamId id, String name2, StreamId id2, String name3, StreamId id3); + RFuture>>> readAsync(StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3); /** * Read stream data by specified stream id mapped by name including this stream. @@ -324,7 +363,7 @@ public interface RStreamAsync extends RExpirableAsync { * @param nameToId - stream id mapped by name * @return stream data mapped by key and Stream ID */ - RFuture>>> readAsync(StreamId id, Map nameToId); + RFuture>>> readAsync(StreamMessageId id, Map nameToId); /** * Read stream data by specified stream name including this stream. @@ -335,7 +374,7 @@ public interface RStreamAsync extends RExpirableAsync { * @param id2 - id of second stream * @return stream data mapped by key and Stream ID */ - RFuture>>> readAsync(int count, StreamId id, String name2, StreamId id2); + RFuture>>> readAsync(int count, StreamMessageId id, String name2, StreamMessageId id2); /** * Read stream data by specified stream names including this stream. @@ -348,7 +387,7 @@ public interface RStreamAsync extends RExpirableAsync { * @param id3 - id of third stream * @return stream data mapped by key and Stream ID */ - RFuture>>> readAsync(int count, StreamId id, String name2, StreamId id2, String name3, StreamId id3); + RFuture>>> readAsync(int count, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3); /** * Read stream data by specified stream id mapped by name including this stream. @@ -358,7 +397,7 @@ public interface RStreamAsync extends RExpirableAsync { * @param nameToId - stream id mapped by name * @return stream data mapped by key and Stream ID */ - RFuture>>> readAsync(int count, StreamId id, Map nameToId); + RFuture>>> readAsync(int count, StreamMessageId id, Map nameToId); /** * Read stream data by specified stream name including this stream. @@ -371,7 +410,7 @@ public interface RStreamAsync extends RExpirableAsync { * @param id2 - id of second stream * @return stream data mapped by key and Stream ID */ - RFuture>>> readAsync(long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2); + RFuture>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2); /** * Read stream data by specified stream names including this stream. @@ -386,7 +425,7 @@ public interface RStreamAsync extends RExpirableAsync { * @param id3 - id of third stream * @return stream data mapped by key and Stream ID */ - RFuture>>> readAsync(long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2, String name3, StreamId id3); + RFuture>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3); /** * Read stream data by specified stream id mapped by name including this stream. @@ -398,7 +437,7 @@ public interface RStreamAsync extends RExpirableAsync { * @param nameToId - stream id mapped by name * @return stream data mapped by key and Stream ID */ - RFuture>>> readAsync(long timeout, TimeUnit unit, StreamId id, Map nameToId); + RFuture>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id, Map nameToId); /** * Read stream data by specified stream name including this stream. @@ -412,7 +451,7 @@ public interface RStreamAsync extends RExpirableAsync { * @param id2 - id of second stream * @return stream data mapped by key and Stream ID */ - RFuture>>> readAsync(int count, long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2); + RFuture>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2); /** * Read stream data by specified stream names including this stream. @@ -428,7 +467,7 @@ public interface RStreamAsync extends RExpirableAsync { * @param id3 - id of third stream * @return stream data mapped by key and Stream ID */ - RFuture>>> readAsync(int count, long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2, String name3, StreamId id3); + RFuture>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3); /** * Read stream data by specified stream id mapped by name including this stream. @@ -441,7 +480,7 @@ public interface RStreamAsync extends RExpirableAsync { * @param nameToId - stream id mapped by name * @return stream data mapped by key and Stream ID */ - RFuture>>> readAsync(int count, long timeout, TimeUnit unit, StreamId id, Map nameToId); + RFuture>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id, Map nameToId); /** * Read stream data in range by specified start Stream ID (included) and end Stream ID (included). @@ -450,7 +489,7 @@ public interface RStreamAsync extends RExpirableAsync { * @param endId - end Stream ID * @return stream data mapped by Stream ID */ - RFuture>> rangeAsync(StreamId startId, StreamId endId); + RFuture>> rangeAsync(StreamMessageId startId, StreamMessageId endId); /** * Read stream data in range by specified start Stream ID (included) and end Stream ID (included). @@ -460,7 +499,7 @@ public interface RStreamAsync extends RExpirableAsync { * @param endId - end Stream ID * @return stream data mapped by Stream ID */ - RFuture>> rangeAsync(int count, StreamId startId, StreamId endId); + RFuture>> rangeAsync(int count, StreamMessageId startId, StreamMessageId endId); /** * Read stream data in reverse order in range by specified start Stream ID (included) and end Stream ID (included). @@ -469,7 +508,7 @@ public interface RStreamAsync extends RExpirableAsync { * @param endId - end Stream ID * @return stream data mapped by Stream ID */ - RFuture>> rangeReversedAsync(StreamId startId, StreamId endId); + RFuture>> rangeReversedAsync(StreamMessageId startId, StreamMessageId endId); /** * Read stream data in reverse order in range by specified start Stream ID (included) and end Stream ID (included). @@ -479,6 +518,30 @@ public interface RStreamAsync extends RExpirableAsync { * @param endId - end Stream ID * @return stream data mapped by Stream ID */ - RFuture>> rangeReversedAsync(int count, StreamId startId, StreamId endId); + RFuture>> rangeReversedAsync(int count, StreamMessageId startId, StreamMessageId endId); + + /** + * Removes messages by id. + * + * @param ids - id of messages to remove + * @return deleted messages amount + */ + RFuture removeAsync(StreamMessageId... ids); + + /** + * Trims stream to specified size + * + * @param size - new size of stream + * @return number of deleted messages + */ + RFuture trimAsync(int size); + + /** + * Trims stream to few tens of entries more than specified length to trim. + * + * @param size - new size of stream + * @return number of deleted messages + */ + RFuture trimNonStrictAsync(int size); } diff --git a/redisson/src/main/java/org/redisson/api/RStreamReactive.java b/redisson/src/main/java/org/redisson/api/RStreamReactive.java index 1ce1ce7c6..b6d838128 100644 --- a/redisson/src/main/java/org/redisson/api/RStreamReactive.java +++ b/redisson/src/main/java/org/redisson/api/RStreamReactive.java @@ -45,13 +45,39 @@ public interface RStreamReactive extends RExpirableReactive { * Creates consumer group by name and stream id. * Only new messages after defined stream id will be available for consumers of this group. *

- * {@link StreamId#NEWEST} is used for messages arrived since the moment of group creating + * {@link StreamMessageId#NEWEST} is used for messages arrived since the moment of group creating * * @param groupName - name of group * @param id - stream id * @return void */ - Mono createGroup(String groupName, StreamId id); + Mono createGroup(String groupName, StreamMessageId id); + + /** + * Removes group by name. + * + * @param groupName - name of group + * @return void + */ + Mono removeGroup(String groupName); + + /** + * Removes consumer of the group by name. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @return number of pending messages owned by consumer + */ + Mono removeConsumer(String groupName, String consumerName); + + /** + * Updates next message id delivered to consumers. + * + * @param groupName - name of group + * @param id - Stream Message ID + * @return void + */ + Mono updateGroupMessageId(String groupName, StreamMessageId id); /** * Marks pending messages by group name and stream ids as correctly processed. @@ -60,7 +86,7 @@ public interface RStreamReactive extends RExpirableReactive { * @param ids - stream ids * @return marked messages amount */ - Mono ack(String groupName, StreamId... ids); + Mono ack(String groupName, StreamMessageId... ids); /** * Returns pending messages by group name @@ -74,8 +100,8 @@ public interface RStreamReactive extends RExpirableReactive { * Returns list of pending messages by group name. * Limited by start stream id and end stream id and count. *

- * {@link StreamId#MAX} is used as max stream id - * {@link StreamId#MIN} is used as min stream id + * {@link StreamMessageId#MAX} is used as max stream id + * {@link StreamMessageId#MIN} is used as min stream id * * @param groupName - name of group * @param startId - start stream id @@ -83,14 +109,14 @@ public interface RStreamReactive extends RExpirableReactive { * @param count - amount of messages * @return list */ - Mono> listPending(String groupName, StreamId startId, StreamId endId, int count); + Mono> listPending(String groupName, StreamMessageId startId, StreamMessageId endId, int count); /** * Returns list of pending messages by group name and consumer name. * Limited by start stream id and end stream id and count. *

- * {@link StreamId#MAX} is used as max stream id - * {@link StreamId#MIN} is used as min stream id + * {@link StreamMessageId#MAX} is used as max stream id + * {@link StreamMessageId#MIN} is used as min stream id * * @param consumerName - name of consumer * @param groupName - name of group @@ -99,7 +125,7 @@ public interface RStreamReactive extends RExpirableReactive { * @param count - amount of messages * @return list */ - Mono> listPending(String groupName, StreamId startId, StreamId endId, int count, String consumerName); + Mono> listPending(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, int count); /** * Transfers ownership of pending messages by id to a new consumer @@ -112,7 +138,7 @@ public interface RStreamReactive extends RExpirableReactive { * @param ids - stream ids * @return stream data mapped by Stream ID */ - Mono>> claim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamId ... ids); + Mono>> claim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId ... ids); /** * Read stream data from groupName by consumerName and specified collection of Stream IDs. @@ -122,7 +148,7 @@ public interface RStreamReactive extends RExpirableReactive { * @param ids - collection of Stream IDs * @return stream data mapped by Stream ID */ - Mono>> readGroup(String groupName, String consumerName, StreamId ... ids); + Mono>> readGroup(String groupName, String consumerName, StreamMessageId ... ids); /** * Read stream data from groupName by consumerName and specified collection of Stream IDs. @@ -133,7 +159,7 @@ public interface RStreamReactive extends RExpirableReactive { * @param ids - collection of Stream IDs * @return stream data mapped by Stream ID */ - Mono>> readGroup(String groupName, String consumerName, int count, StreamId ... ids); + Mono>> readGroup(String groupName, String consumerName, int count, StreamMessageId ... ids); /** * Read stream data from groupName by consumerName and specified collection of Stream IDs. @@ -146,7 +172,7 @@ public interface RStreamReactive extends RExpirableReactive { * @param ids - collection of Stream IDs * @return stream data mapped by Stream ID */ - Mono>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamId ... ids); + Mono>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId ... ids); /** * Read stream data from groupName by consumerName and specified collection of Stream IDs. @@ -160,7 +186,7 @@ public interface RStreamReactive extends RExpirableReactive { * @param ids - collection of Stream IDs * @return stream data mapped by Stream ID */ - Mono>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamId ... ids); + Mono>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId ... ids); /** @@ -177,7 +203,7 @@ public interface RStreamReactive extends RExpirableReactive { * @param value - value of entry * @return Stream ID */ - Mono add(K key, V value); + Mono add(K key, V value); /** * Appends a new entry by specified Stream ID @@ -187,7 +213,7 @@ public interface RStreamReactive extends RExpirableReactive { * @param value - value of entry * @return void */ - Mono add(StreamId id, K key, V value); + Mono add(StreamMessageId id, K key, V value); /** * Appends a new entry and returns generated Stream ID. @@ -200,7 +226,7 @@ public interface RStreamReactive extends RExpirableReactive { * @param trimStrict - if false then trims to few tens of entries more than specified length to trim * @return Stream ID */ - Mono add(K key, V value, int trimLen, boolean trimStrict); + Mono add(K key, V value, int trimLen, boolean trimStrict); /** * Appends a new entry by specified Stream ID. @@ -214,7 +240,7 @@ public interface RStreamReactive extends RExpirableReactive { * @param trimStrict - if false then trims to few tens of entries more than specified length to trim * @return void */ - Mono add(StreamId id, K key, V value, int trimLen, boolean trimStrict); + Mono add(StreamMessageId id, K key, V value, int trimLen, boolean trimStrict); /** * Appends new entries and returns generated Stream ID @@ -222,7 +248,7 @@ public interface RStreamReactive extends RExpirableReactive { * @param entries - entries to add * @return Stream ID */ - Mono addAll(Map entries); + Mono addAll(Map entries); /** * Appends new entries by specified Stream ID @@ -231,7 +257,7 @@ public interface RStreamReactive extends RExpirableReactive { * @param entries - entries to add * @return void */ - Mono addAll(StreamId id, Map entries); + Mono addAll(StreamMessageId id, Map entries); /** * Appends new entries and returns generated Stream ID. @@ -243,7 +269,7 @@ public interface RStreamReactive extends RExpirableReactive { * @param trimStrict - if false then trims to few tens of entries more than specified length to trim * @return Stream ID */ - Mono addAll(Map entries, int trimLen, boolean trimStrict); + Mono addAll(Map entries, int trimLen, boolean trimStrict); /** * Appends new entries by specified Stream ID. @@ -256,7 +282,7 @@ public interface RStreamReactive extends RExpirableReactive { * @param trimStrict - if false then trims to few tens of entries more than specified length to trim * @return void */ - Mono addAll(StreamId id, Map entries, int trimLen, boolean trimStrict); + Mono addAll(StreamMessageId id, Map entries, int trimLen, boolean trimStrict); /** * Read stream data by specified collection of Stream IDs. @@ -264,7 +290,7 @@ public interface RStreamReactive extends RExpirableReactive { * @param ids - collection of Stream IDs * @return stream data mapped by Stream ID */ - Mono>> read(StreamId ... ids); + Mono>> read(StreamMessageId ... ids); /** * Read stream data by specified collection of Stream IDs. @@ -273,7 +299,7 @@ public interface RStreamReactive extends RExpirableReactive { * @param ids - collection of Stream IDs * @return stream data mapped by Stream ID */ - Mono>> read(int count, StreamId ... ids); + Mono>> read(int count, StreamMessageId ... ids); /** * Read stream data by specified collection of Stream IDs. @@ -284,7 +310,7 @@ public interface RStreamReactive extends RExpirableReactive { * @param ids - collection of Stream IDs * @return stream data mapped by Stream ID */ - Mono>> read(long timeout, TimeUnit unit, StreamId ... ids); + Mono>> read(long timeout, TimeUnit unit, StreamMessageId ... ids); /** * Read stream data by specified collection of Stream IDs. @@ -296,7 +322,7 @@ public interface RStreamReactive extends RExpirableReactive { * @param ids - collection of Stream IDs * @return stream data mapped by Stream ID */ - Mono>> read(int count, long timeout, TimeUnit unit, StreamId ... ids); + Mono>> read(int count, long timeout, TimeUnit unit, StreamMessageId ... ids); /** * Read stream data by specified stream name including this stream. @@ -306,7 +332,7 @@ public interface RStreamReactive extends RExpirableReactive { * @param id2 - id of second stream * @return stream data mapped by key and Stream ID */ - Mono>>> read(StreamId id, String name2, StreamId id2); + Mono>>> read(StreamMessageId id, String name2, StreamMessageId id2); /** * Read stream data by specified stream names including this stream. @@ -318,7 +344,7 @@ public interface RStreamReactive extends RExpirableReactive { * @param id3 - id of third stream * @return stream data mapped by key and Stream ID */ - Mono>>> read(StreamId id, String name2, StreamId id2, String name3, StreamId id3); + Mono>>> read(StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3); /** * Read stream data by specified stream id mapped by name including this stream. @@ -327,7 +353,7 @@ public interface RStreamReactive extends RExpirableReactive { * @param nameToId - stream id mapped by name * @return stream data mapped by key and Stream ID */ - Mono>>> read(StreamId id, Map nameToId); + Mono>>> read(StreamMessageId id, Map nameToId); /** * Read stream data by specified stream name including this stream. @@ -338,7 +364,7 @@ public interface RStreamReactive extends RExpirableReactive { * @param id2 - id of second stream * @return stream data mapped by key and Stream ID */ - Mono>>> read(int count, StreamId id, String name2, StreamId id2); + Mono>>> read(int count, StreamMessageId id, String name2, StreamMessageId id2); /** * Read stream data by specified stream names including this stream. @@ -351,7 +377,7 @@ public interface RStreamReactive extends RExpirableReactive { * @param id3 - id of third stream * @return stream data mapped by key and Stream ID */ - Mono>>> read(int count, StreamId id, String name2, StreamId id2, String name3, StreamId id3); + Mono>>> read(int count, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3); /** * Read stream data by specified stream id mapped by name including this stream. @@ -361,7 +387,7 @@ public interface RStreamReactive extends RExpirableReactive { * @param nameToId - stream id mapped by name * @return stream data mapped by key and Stream ID */ - Mono>>> read(int count, StreamId id, Map nameToId); + Mono>>> read(int count, StreamMessageId id, Map nameToId); /** * Read stream data by specified stream name including this stream. @@ -374,7 +400,7 @@ public interface RStreamReactive extends RExpirableReactive { * @param id2 - id of second stream * @return stream data mapped by key and Stream ID */ - Mono>>> read(long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2); + Mono>>> read(long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2); /** * Read stream data by specified stream names including this stream. @@ -389,7 +415,7 @@ public interface RStreamReactive extends RExpirableReactive { * @param id3 - id of third stream * @return stream data mapped by key and Stream ID */ - Mono>>> read(long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2, String name3, StreamId id3); + Mono>>> read(long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3); /** * Read stream data by specified stream id mapped by name including this stream. @@ -401,7 +427,7 @@ public interface RStreamReactive extends RExpirableReactive { * @param nameToId - stream id mapped by name * @return stream data mapped by key and Stream ID */ - Mono>>> read(long timeout, TimeUnit unit, StreamId id, Map nameToId); + Mono>>> read(long timeout, TimeUnit unit, StreamMessageId id, Map nameToId); /** * Read stream data by specified stream name including this stream. @@ -415,7 +441,7 @@ public interface RStreamReactive extends RExpirableReactive { * @param id2 - id of second stream * @return stream data mapped by key and Stream ID */ - Mono>>> read(int count, long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2); + Mono>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2); /** * Read stream data by specified stream names including this stream. @@ -431,7 +457,7 @@ public interface RStreamReactive extends RExpirableReactive { * @param id3 - id of third stream * @return stream data mapped by key and Stream ID */ - Mono>>> read(int count, long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2, String name3, StreamId id3); + Mono>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3); /** * Read stream data by specified stream id mapped by name including this stream. @@ -444,7 +470,7 @@ public interface RStreamReactive extends RExpirableReactive { * @param nameToId - stream id mapped by name * @return stream data mapped by key and Stream ID */ - Mono>>> read(int count, long timeout, TimeUnit unit, StreamId id, Map nameToId); + Mono>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, Map nameToId); /** * Read stream data in range by specified start Stream ID (included) and end Stream ID (included). @@ -453,7 +479,7 @@ public interface RStreamReactive extends RExpirableReactive { * @param endId - end Stream ID * @return stream data mapped by Stream ID */ - Mono>> range(StreamId startId, StreamId endId); + Mono>> range(StreamMessageId startId, StreamMessageId endId); /** * Read stream data in range by specified start Stream ID (included) and end Stream ID (included). @@ -463,7 +489,7 @@ public interface RStreamReactive extends RExpirableReactive { * @param endId - end Stream ID * @return stream data mapped by Stream ID */ - Mono>> range(int count, StreamId startId, StreamId endId); + Mono>> range(int count, StreamMessageId startId, StreamMessageId endId); /** * Read stream data in reverse order in range by specified start Stream ID (included) and end Stream ID (included). @@ -472,7 +498,7 @@ public interface RStreamReactive extends RExpirableReactive { * @param endId - end Stream ID * @return stream data mapped by Stream ID */ - Mono>> rangeReversed(StreamId startId, StreamId endId); + Mono>> rangeReversed(StreamMessageId startId, StreamMessageId endId); /** * Read stream data in reverse order in range by specified start Stream ID (included) and end Stream ID (included). @@ -482,6 +508,30 @@ public interface RStreamReactive extends RExpirableReactive { * @param endId - end Stream ID * @return stream data mapped by Stream ID */ - Mono>> rangeReversed(int count, StreamId startId, StreamId endId); + Mono>> rangeReversed(int count, StreamMessageId startId, StreamMessageId endId); + + /** + * Removes messages by id. + * + * @param ids - id of messages to remove + * @return deleted messages amount + */ + Mono remove(StreamMessageId... ids); + + /** + * Trims stream to specified size + * + * @param size - new size of stream + * @return number of deleted messages + */ + Mono trim(int size); + + /** + * Trims stream to few tens of entries more than specified length to trim. + * + * @param size - new size of stream + * @return number of deleted messages + */ + Mono trimNonStrict(int size); } diff --git a/redisson/src/main/java/org/redisson/api/RStreamRx.java b/redisson/src/main/java/org/redisson/api/RStreamRx.java index f18ec48f4..681aaa016 100644 --- a/redisson/src/main/java/org/redisson/api/RStreamRx.java +++ b/redisson/src/main/java/org/redisson/api/RStreamRx.java @@ -45,13 +45,39 @@ public interface RStreamRx extends RExpirableRx { * Creates consumer group by name and stream id. * Only new messages after defined stream id will be available for consumers of this group. *

- * {@link StreamId#NEWEST} is used for messages arrived since the moment of group creating + * {@link StreamMessageId#NEWEST} is used for messages arrived since the moment of group creating * * @param groupName - name of group * @param id - stream id * @return void */ - Flowable createGroup(String groupName, StreamId id); + Flowable createGroup(String groupName, StreamMessageId id); + + /** + * Removes group by name. + * + * @param groupName - name of group + * @return void + */ + Flowable removeGroup(String groupName); + + /** + * Removes consumer of the group by name. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @return number of pending messages owned by consumer + */ + Flowable removeConsumer(String groupName, String consumerName); + + /** + * Updates next message id delivered to consumers. + * + * @param groupName - name of group + * @param id - Stream Message ID + * @return void + */ + Flowable updateGroupMessageId(String groupName, StreamMessageId id); /** * Marks pending messages by group name and stream ids as correctly processed. @@ -60,7 +86,7 @@ public interface RStreamRx extends RExpirableRx { * @param ids - stream ids * @return marked messages amount */ - Flowable ack(String groupName, StreamId... ids); + Flowable ack(String groupName, StreamMessageId... ids); /** * Returns pending messages by group name @@ -74,8 +100,8 @@ public interface RStreamRx extends RExpirableRx { * Returns list of pending messages by group name. * Limited by start stream id and end stream id and count. *

- * {@link StreamId#MAX} is used as max stream id - * {@link StreamId#MIN} is used as min stream id + * {@link StreamMessageId#MAX} is used as max stream id + * {@link StreamMessageId#MIN} is used as min stream id * * @param groupName - name of group * @param startId - start stream id @@ -83,14 +109,14 @@ public interface RStreamRx extends RExpirableRx { * @param count - amount of messages * @return list */ - Flowable> listPending(String groupName, StreamId startId, StreamId endId, int count); + Flowable> listPending(String groupName, StreamMessageId startId, StreamMessageId endId, int count); /** * Returns list of pending messages by group name and consumer name. * Limited by start stream id and end stream id and count. *

- * {@link StreamId#MAX} is used as max stream id - * {@link StreamId#MIN} is used as min stream id + * {@link StreamMessageId#MAX} is used as max stream id + * {@link StreamMessageId#MIN} is used as min stream id * * @param consumerName - name of consumer * @param groupName - name of group @@ -99,7 +125,7 @@ public interface RStreamRx extends RExpirableRx { * @param count - amount of messages * @return list */ - Flowable> listPending(String groupName, StreamId startId, StreamId endId, int count, String consumerName); + Flowable> listPending(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, int count); /** * Transfers ownership of pending messages by id to a new consumer @@ -112,7 +138,7 @@ public interface RStreamRx extends RExpirableRx { * @param ids - stream ids * @return stream data mapped by Stream ID */ - Flowable>> claim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamId ... ids); + Flowable>> claim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId ... ids); /** * Read stream data from groupName by consumerName and specified collection of Stream IDs. @@ -122,7 +148,7 @@ public interface RStreamRx extends RExpirableRx { * @param ids - collection of Stream IDs * @return stream data mapped by Stream ID */ - Flowable>> readGroup(String groupName, String consumerName, StreamId ... ids); + Flowable>> readGroup(String groupName, String consumerName, StreamMessageId ... ids); /** * Read stream data from groupName by consumerName and specified collection of Stream IDs. @@ -133,7 +159,7 @@ public interface RStreamRx extends RExpirableRx { * @param ids - collection of Stream IDs * @return stream data mapped by Stream ID */ - Flowable>> readGroup(String groupName, String consumerName, int count, StreamId ... ids); + Flowable>> readGroup(String groupName, String consumerName, int count, StreamMessageId ... ids); /** * Read stream data from groupName by consumerName and specified collection of Stream IDs. @@ -146,7 +172,7 @@ public interface RStreamRx extends RExpirableRx { * @param ids - collection of Stream IDs * @return stream data mapped by Stream ID */ - Flowable>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamId ... ids); + Flowable>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId ... ids); /** * Read stream data from groupName by consumerName and specified collection of Stream IDs. @@ -160,7 +186,7 @@ public interface RStreamRx extends RExpirableRx { * @param ids - collection of Stream IDs * @return stream data mapped by Stream ID */ - Flowable>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamId ... ids); + Flowable>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId ... ids); /** @@ -177,7 +203,7 @@ public interface RStreamRx extends RExpirableRx { * @param value - value of entry * @return Stream ID */ - Flowable add(K key, V value); + Flowable add(K key, V value); /** * Appends a new entry by specified Stream ID @@ -187,7 +213,7 @@ public interface RStreamRx extends RExpirableRx { * @param value - value of entry * @return void */ - Flowable add(StreamId id, K key, V value); + Flowable add(StreamMessageId id, K key, V value); /** * Appends a new entry and returns generated Stream ID. @@ -200,7 +226,7 @@ public interface RStreamRx extends RExpirableRx { * @param trimStrict - if false then trims to few tens of entries more than specified length to trim * @return Stream ID */ - Flowable add(K key, V value, int trimLen, boolean trimStrict); + Flowable add(K key, V value, int trimLen, boolean trimStrict); /** * Appends a new entry by specified Stream ID. @@ -214,7 +240,7 @@ public interface RStreamRx extends RExpirableRx { * @param trimStrict - if false then trims to few tens of entries more than specified length to trim * @return void */ - Flowable add(StreamId id, K key, V value, int trimLen, boolean trimStrict); + Flowable add(StreamMessageId id, K key, V value, int trimLen, boolean trimStrict); /** * Appends new entries and returns generated Stream ID @@ -222,7 +248,7 @@ public interface RStreamRx extends RExpirableRx { * @param entries - entries to add * @return Stream ID */ - Flowable addAll(Map entries); + Flowable addAll(Map entries); /** * Appends new entries by specified Stream ID @@ -231,7 +257,7 @@ public interface RStreamRx extends RExpirableRx { * @param entries - entries to add * @return void */ - Flowable addAll(StreamId id, Map entries); + Flowable addAll(StreamMessageId id, Map entries); /** * Appends new entries and returns generated Stream ID. @@ -243,7 +269,7 @@ public interface RStreamRx extends RExpirableRx { * @param trimStrict - if false then trims to few tens of entries more than specified length to trim * @return Stream ID */ - Flowable addAll(Map entries, int trimLen, boolean trimStrict); + Flowable addAll(Map entries, int trimLen, boolean trimStrict); /** * Appends new entries by specified Stream ID. @@ -256,7 +282,7 @@ public interface RStreamRx extends RExpirableRx { * @param trimStrict - if false then trims to few tens of entries more than specified length to trim * @return void */ - Flowable addAll(StreamId id, Map entries, int trimLen, boolean trimStrict); + Flowable addAll(StreamMessageId id, Map entries, int trimLen, boolean trimStrict); /** * Read stream data by specified collection of Stream IDs. @@ -264,7 +290,7 @@ public interface RStreamRx extends RExpirableRx { * @param ids - collection of Stream IDs * @return stream data mapped by Stream ID */ - Flowable>> read(StreamId ... ids); + Flowable>> read(StreamMessageId ... ids); /** * Read stream data by specified collection of Stream IDs. @@ -273,7 +299,7 @@ public interface RStreamRx extends RExpirableRx { * @param ids - collection of Stream IDs * @return stream data mapped by Stream ID */ - Flowable>> read(int count, StreamId ... ids); + Flowable>> read(int count, StreamMessageId ... ids); /** * Read stream data by specified collection of Stream IDs. @@ -284,7 +310,7 @@ public interface RStreamRx extends RExpirableRx { * @param ids - collection of Stream IDs * @return stream data mapped by Stream ID */ - Flowable>> read(long timeout, TimeUnit unit, StreamId ... ids); + Flowable>> read(long timeout, TimeUnit unit, StreamMessageId ... ids); /** * Read stream data by specified collection of Stream IDs. @@ -296,7 +322,7 @@ public interface RStreamRx extends RExpirableRx { * @param ids - collection of Stream IDs * @return stream data mapped by Stream ID */ - Flowable>> read(int count, long timeout, TimeUnit unit, StreamId ... ids); + Flowable>> read(int count, long timeout, TimeUnit unit, StreamMessageId ... ids); /** * Read stream data by specified stream name including this stream. @@ -306,7 +332,7 @@ public interface RStreamRx extends RExpirableRx { * @param id2 - id of second stream * @return stream data mapped by key and Stream ID */ - Flowable>>> read(StreamId id, String name2, StreamId id2); + Flowable>>> read(StreamMessageId id, String name2, StreamMessageId id2); /** * Read stream data by specified stream names including this stream. @@ -318,7 +344,7 @@ public interface RStreamRx extends RExpirableRx { * @param id3 - id of third stream * @return stream data mapped by key and Stream ID */ - Flowable>>> read(StreamId id, String name2, StreamId id2, String name3, StreamId id3); + Flowable>>> read(StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3); /** * Read stream data by specified stream id mapped by name including this stream. @@ -327,7 +353,7 @@ public interface RStreamRx extends RExpirableRx { * @param nameToId - stream id mapped by name * @return stream data mapped by key and Stream ID */ - Flowable>>> read(StreamId id, Map nameToId); + Flowable>>> read(StreamMessageId id, Map nameToId); /** * Read stream data by specified stream name including this stream. @@ -338,7 +364,7 @@ public interface RStreamRx extends RExpirableRx { * @param id2 - id of second stream * @return stream data mapped by key and Stream ID */ - Flowable>>> read(int count, StreamId id, String name2, StreamId id2); + Flowable>>> read(int count, StreamMessageId id, String name2, StreamMessageId id2); /** * Read stream data by specified stream names including this stream. @@ -351,7 +377,7 @@ public interface RStreamRx extends RExpirableRx { * @param id3 - id of third stream * @return stream data mapped by key and Stream ID */ - Flowable>>> read(int count, StreamId id, String name2, StreamId id2, String name3, StreamId id3); + Flowable>>> read(int count, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3); /** * Read stream data by specified stream id mapped by name including this stream. @@ -361,7 +387,7 @@ public interface RStreamRx extends RExpirableRx { * @param nameToId - stream id mapped by name * @return stream data mapped by key and Stream ID */ - Flowable>>> read(int count, StreamId id, Map nameToId); + Flowable>>> read(int count, StreamMessageId id, Map nameToId); /** * Read stream data by specified stream name including this stream. @@ -374,7 +400,7 @@ public interface RStreamRx extends RExpirableRx { * @param id2 - id of second stream * @return stream data mapped by key and Stream ID */ - Flowable>>> read(long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2); + Flowable>>> read(long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2); /** * Read stream data by specified stream names including this stream. @@ -389,7 +415,7 @@ public interface RStreamRx extends RExpirableRx { * @param id3 - id of third stream * @return stream data mapped by key and Stream ID */ - Flowable>>> read(long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2, String name3, StreamId id3); + Flowable>>> read(long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3); /** * Read stream data by specified stream id mapped by name including this stream. @@ -401,7 +427,7 @@ public interface RStreamRx extends RExpirableRx { * @param nameToId - stream id mapped by name * @return stream data mapped by key and Stream ID */ - Flowable>>> read(long timeout, TimeUnit unit, StreamId id, Map nameToId); + Flowable>>> read(long timeout, TimeUnit unit, StreamMessageId id, Map nameToId); /** * Read stream data by specified stream name including this stream. @@ -415,7 +441,7 @@ public interface RStreamRx extends RExpirableRx { * @param id2 - id of second stream * @return stream data mapped by key and Stream ID */ - Flowable>>> read(int count, long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2); + Flowable>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2); /** * Read stream data by specified stream names including this stream. @@ -431,7 +457,7 @@ public interface RStreamRx extends RExpirableRx { * @param id3 - id of third stream * @return stream data mapped by key and Stream ID */ - Flowable>>> read(int count, long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2, String name3, StreamId id3); + Flowable>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3); /** * Read stream data by specified stream id mapped by name including this stream. @@ -444,7 +470,7 @@ public interface RStreamRx extends RExpirableRx { * @param nameToId - stream id mapped by name * @return stream data mapped by key and Stream ID */ - Flowable>>> read(int count, long timeout, TimeUnit unit, StreamId id, Map nameToId); + Flowable>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, Map nameToId); /** * Read stream data in range by specified start Stream ID (included) and end Stream ID (included). @@ -453,7 +479,7 @@ public interface RStreamRx extends RExpirableRx { * @param endId - end Stream ID * @return stream data mapped by Stream ID */ - Flowable>> range(StreamId startId, StreamId endId); + Flowable>> range(StreamMessageId startId, StreamMessageId endId); /** * Read stream data in range by specified start Stream ID (included) and end Stream ID (included). @@ -463,7 +489,7 @@ public interface RStreamRx extends RExpirableRx { * @param endId - end Stream ID * @return stream data mapped by Stream ID */ - Flowable>> range(int count, StreamId startId, StreamId endId); + Flowable>> range(int count, StreamMessageId startId, StreamMessageId endId); /** * Read stream data in reverse order in range by specified start Stream ID (included) and end Stream ID (included). @@ -472,7 +498,7 @@ public interface RStreamRx extends RExpirableRx { * @param endId - end Stream ID * @return stream data mapped by Stream ID */ - Flowable>> rangeReversed(StreamId startId, StreamId endId); + Flowable>> rangeReversed(StreamMessageId startId, StreamMessageId endId); /** * Read stream data in reverse order in range by specified start Stream ID (included) and end Stream ID (included). @@ -482,6 +508,30 @@ public interface RStreamRx extends RExpirableRx { * @param endId - end Stream ID * @return stream data mapped by Stream ID */ - Flowable>> rangeReversed(int count, StreamId startId, StreamId endId); + Flowable>> rangeReversed(int count, StreamMessageId startId, StreamMessageId endId); + + /** + * Removes messages by id. + * + * @param ids - id of messages to remove + * @return deleted messages amount + */ + Flowable remove(StreamMessageId... ids); + + /** + * Trims stream to specified size + * + * @param size - new size of stream + * @return number of deleted messages + */ + Flowable trim(int size); + + /** + * Trims stream to few tens of entries more than specified length to trim. + * + * @param size - new size of stream + * @return number of deleted messages + */ + Flowable trimNonStrict(int size); } diff --git a/redisson/src/main/java/org/redisson/api/StreamId.java b/redisson/src/main/java/org/redisson/api/StreamMessageId.java similarity index 86% rename from redisson/src/main/java/org/redisson/api/StreamId.java rename to redisson/src/main/java/org/redisson/api/StreamMessageId.java index ddf6bdbc8..a96491125 100644 --- a/redisson/src/main/java/org/redisson/api/StreamId.java +++ b/redisson/src/main/java/org/redisson/api/StreamMessageId.java @@ -21,34 +21,34 @@ package org.redisson.api; * @author Nikita Koksharov * */ -public class StreamId { +public class StreamMessageId { /** * Defines minimal id. Used in {@link RStream#range} and {@link RStreamAsync#rangeAsync} methods */ - public static final StreamId MIN = new StreamId(-1); + public static final StreamMessageId MIN = new StreamMessageId(-1); /** * Defines maximal id. Used in {@link RStream#range} and {@link RStreamAsync#rangeAsync} methods */ - public static final StreamId MAX = new StreamId(-1); + public static final StreamMessageId MAX = new StreamMessageId(-1); /** * Defines latest id to receive Stream entries added since method invocation. *

* Used in {@link RStream#read} and {@link RStreamAsync#readAsync} methods */ - public static final StreamId NEWEST = new StreamId(-1); + public static final StreamMessageId NEWEST = new StreamMessageId(-1); private long id0; private long id1; - public StreamId(long id0) { + public StreamMessageId(long id0) { super(); this.id0 = id0; } - public StreamId(long id0, long id1) { + public StreamMessageId(long id0, long id1) { super(); this.id0 = id0; this.id1 = id1; @@ -89,7 +89,7 @@ public class StreamId { return false; if (getClass() != obj.getClass()) return false; - StreamId other = (StreamId) obj; + StreamMessageId other = (StreamMessageId) obj; if (id0 != other.id0) return false; if (id1 != other.id1) diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index 51d3118a4..31ddabb4f 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -23,7 +23,7 @@ import java.util.Map.Entry; import java.util.Set; import org.redisson.api.RType; -import org.redisson.api.StreamId; +import org.redisson.api.StreamMessageId; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.convertor.BitsSizeReplayConvertor; @@ -70,6 +70,7 @@ import org.redisson.client.protocol.decoder.ScoredSortedSetScanDecoder; import org.redisson.client.protocol.decoder.ScoredSortedSetScanReplayDecoder; import org.redisson.client.protocol.decoder.SlotsDecoder; import org.redisson.client.protocol.decoder.StreamIdDecoder; +import org.redisson.client.protocol.decoder.StreamIdListDecoder; import org.redisson.client.protocol.decoder.StreamResultDecoder; import org.redisson.client.protocol.decoder.StringDataDecoder; import org.redisson.client.protocol.decoder.StringListReplayDecoder; @@ -325,17 +326,17 @@ public interface RedisCommands { RedisCommand SETNX = new RedisCommand("SETNX", new BooleanReplayConvertor()); RedisCommand PSETEX = new RedisCommand("PSETEX", new VoidReplayConvertor()); - RedisCommand>>> XRANGE = new RedisCommand>>>("XRANGE", + RedisCommand>>> XRANGE = new RedisCommand>>>("XRANGE", new ListMultiDecoder( new ObjectDecoder(new StreamIdDecoder()), new ObjectMapReplayDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET), new ObjectMapJoinDecoder())); - RedisCommand>>> XREVRANGE = new RedisCommand>>>("XREVRANGE", + RedisCommand>>> XREVRANGE = new RedisCommand>>>("XREVRANGE", XRANGE.getReplayMultiDecoder()); - RedisCommand>>> XREAD = new RedisCommand>>>("XREAD", + RedisCommand>>> XREAD = new RedisCommand>>>("XREAD", new ListMultiDecoder( new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()), new ObjectDecoder(new StreamIdDecoder()), @@ -345,9 +346,9 @@ public interface RedisCommands { new ObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX), new StreamResultDecoder())); - RedisCommand>>> XREAD_BLOCKING = new RedisCommand>>>("XREAD", XREAD.getReplayMultiDecoder()); + RedisCommand>>> XREAD_BLOCKING = new RedisCommand>>>("XREAD", XREAD.getReplayMultiDecoder()); - RedisCommand>> XREAD_SINGLE = new RedisCommand>>("XREAD", + RedisCommand>> XREAD_SINGLE = new RedisCommand>>("XREAD", new ListMultiDecoder( new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()), new ObjectDecoder(new StreamIdDecoder()), @@ -357,9 +358,9 @@ public interface RedisCommands { new ObjectMapReplayDecoder(), new StreamResultDecoder())); - RedisCommand>> XREAD_BLOCKING_SINGLE = new RedisCommand>>("XREAD", XREAD_SINGLE.getReplayMultiDecoder()); + RedisCommand>> XREAD_BLOCKING_SINGLE = new RedisCommand>>("XREAD", XREAD_SINGLE.getReplayMultiDecoder()); - RedisCommand>>> XREADGROUP = new RedisCommand>>>("XREADGROUP", + RedisCommand>>> XREADGROUP = new RedisCommand>>>("XREADGROUP", new ListMultiDecoder( new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()), new ObjectDecoder(new StreamIdDecoder()), @@ -369,7 +370,7 @@ public interface RedisCommands { new ObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX), new StreamResultDecoder())); - RedisCommand>> XREADGROUP_BLOCKING = new RedisCommand>>("XREADGROUP", + RedisCommand>> XREADGROUP_BLOCKING = new RedisCommand>>("XREADGROUP", new ListMultiDecoder( new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()), new ObjectDecoder(new StreamIdDecoder()), @@ -379,7 +380,7 @@ public interface RedisCommands { new ObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX), new StreamResultDecoder())); - RedisCommand>> XREADGROUP_SINGLE = new RedisCommand>>("XREADGROUP", + RedisCommand>> XREADGROUP_SINGLE = new RedisCommand>>("XREADGROUP", new ListMultiDecoder( new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()), new ObjectDecoder(new StreamIdDecoder()), @@ -389,24 +390,29 @@ public interface RedisCommands { new ObjectMapReplayDecoder(), new StreamResultDecoder())); - RedisCommand>>> XCLAIM = new RedisCommand>>>("XCLAIM", + RedisCommand> XCLAIM_IDS = new RedisCommand>("XCLAIM", new StreamIdListDecoder()); + + RedisCommand>> XCLAIM = new RedisCommand>>("XCLAIM", new ListMultiDecoder( new ObjectDecoder(new StreamIdDecoder()), new ObjectMapReplayDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET), new ObjectMapJoinDecoder())); - RedisCommand>> XREADGROUP_BLOCKING_SINGLE = new RedisCommand>>("XREADGROUP", + RedisCommand>> XREADGROUP_BLOCKING_SINGLE = new RedisCommand>>("XREADGROUP", XREADGROUP_SINGLE.getReplayMultiDecoder()); Set BLOCKING_COMMANDS = new HashSet(Arrays.asList( XREAD_BLOCKING_SINGLE, XREAD_BLOCKING, XREADGROUP_BLOCKING_SINGLE, XREADGROUP_BLOCKING)); - RedisStrictCommand XADD = new RedisStrictCommand("XADD", new StreamIdConvertor()); + RedisStrictCommand XADD = new RedisStrictCommand("XADD", new StreamIdConvertor()); RedisStrictCommand XGROUP = new RedisStrictCommand("XGROUP", new VoidReplayConvertor()); + RedisStrictCommand XGROUP_LONG = new RedisStrictCommand("XGROUP"); RedisStrictCommand XADD_VOID = new RedisStrictCommand("XADD", new VoidReplayConvertor()); RedisStrictCommand XLEN = new RedisStrictCommand("XLEN"); RedisStrictCommand XACK = new RedisStrictCommand("XACK"); + RedisStrictCommand XDEL = new RedisStrictCommand("XDEL"); + RedisStrictCommand XTRIM = new RedisStrictCommand("XTRIM"); RedisCommand XPENDING = new RedisCommand("XPENDING", new ListMultiDecoder(new ObjectListReplayDecoder(), new ObjectListReplayDecoder(ListMultiDecoder.RESET), new PendingResultDecoder())); RedisCommand XPENDING_ENTRIES = new RedisCommand("XPENDING", diff --git a/redisson/src/main/java/org/redisson/client/protocol/convertor/StreamIdConvertor.java b/redisson/src/main/java/org/redisson/client/protocol/convertor/StreamIdConvertor.java index 8e508dab5..e37a1485a 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/convertor/StreamIdConvertor.java +++ b/redisson/src/main/java/org/redisson/client/protocol/convertor/StreamIdConvertor.java @@ -15,19 +15,19 @@ */ package org.redisson.client.protocol.convertor; -import org.redisson.api.StreamId; +import org.redisson.api.StreamMessageId; /** * * @author Nikita Koksharov * */ -public class StreamIdConvertor extends SingleConvertor { +public class StreamIdConvertor extends SingleConvertor { @Override - public StreamId convert(Object id) { + public StreamMessageId convert(Object id) { String[] parts = id.toString().split("-"); - return new StreamId(Long.valueOf(parts[0]), Long.valueOf(parts[1])); + return new StreamMessageId(Long.valueOf(parts[0]), Long.valueOf(parts[1])); } } diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/StreamIdDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/StreamIdDecoder.java index ef4a31ae7..ccb70788f 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/StreamIdDecoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/StreamIdDecoder.java @@ -17,7 +17,7 @@ package org.redisson.client.protocol.decoder; import java.io.IOException; -import org.redisson.api.StreamId; +import org.redisson.api.StreamMessageId; import org.redisson.client.codec.StringCodec; import org.redisson.client.handler.State; import org.redisson.client.protocol.Decoder; @@ -35,7 +35,7 @@ public class StreamIdDecoder implements Decoder { public Object decode(ByteBuf buf, State state) throws IOException { String id = (String) StringCodec.INSTANCE.getValueDecoder().decode(buf, state); String[] parts = id.toString().split("-"); - return new StreamId(Long.valueOf(parts[0]), Long.valueOf(parts[1])); + return new StreamMessageId(Long.valueOf(parts[0]), Long.valueOf(parts[1])); } } diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/StreamIdListDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/StreamIdListDecoder.java new file mode 100644 index 000000000..b17f1082f --- /dev/null +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/StreamIdListDecoder.java @@ -0,0 +1,50 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol.decoder; + +import java.util.ArrayList; +import java.util.List; + +import org.redisson.api.StreamMessageId; +import org.redisson.client.handler.State; +import org.redisson.client.protocol.Decoder; +import org.redisson.client.protocol.convertor.StreamIdConvertor; + +/** + * + * @author Nikita Koksharov + * + */ +public class StreamIdListDecoder implements MultiDecoder> { + + private final StreamIdConvertor convertor = new StreamIdConvertor(); + + @Override + public Decoder getDecoder(int paramNum, State state) { + return null; + } + + @Override + public List decode(List parts, State state) { + List ids = new ArrayList(); + for (Object id : parts) { + StreamMessageId streamMessageId = convertor.convert(id); + ids.add(streamMessageId); + } + return ids; + } + +} diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/StreamResultDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/StreamResultDecoder.java index ef0ffb138..b3de4b388 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/StreamResultDecoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/StreamResultDecoder.java @@ -19,7 +19,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import org.redisson.api.StreamId; +import org.redisson.api.StreamMessageId; import org.redisson.client.handler.State; import org.redisson.client.protocol.Decoder; @@ -33,7 +33,7 @@ public class StreamResultDecoder implements MultiDecoder { @Override public Object decode(List parts, State state) { if (!parts.isEmpty()) { - Map>> result = (Map>>) parts.get(0); + Map>> result = (Map>>) parts.get(0); return result.values().iterator().next(); } return Collections.emptyMap(); diff --git a/redisson/src/main/java/org/redisson/connection/DNSMonitor.java b/redisson/src/main/java/org/redisson/connection/DNSMonitor.java index 599746d34..d933c8f56 100644 --- a/redisson/src/main/java/org/redisson/connection/DNSMonitor.java +++ b/redisson/src/main/java/org/redisson/connection/DNSMonitor.java @@ -88,89 +88,96 @@ public class DNSMonitor { return; } - final AtomicInteger counter = new AtomicInteger(masters.size() + slaves.size()); - for (final Entry entry : masters.entrySet()) { - Future resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(entry.getKey().getHost(), entry.getKey().getPort())); - resolveFuture.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (counter.decrementAndGet() == 0) { - monitorDnsChange(); - } + AtomicInteger counter = new AtomicInteger(masters.size() + slaves.size()); + monitorMasters(counter); + monitorSlaves(counter); + } - if (!future.isSuccess()) { - log.error("Unable to resolve " + entry.getKey().getHost(), future.cause()); - return; - } - - InetSocketAddress currentMasterAddr = entry.getValue(); - InetSocketAddress newMasterAddr = future.getNow(); - if (!newMasterAddr.getAddress().equals(currentMasterAddr.getAddress())) { - log.info("Detected DNS change. Master {} has changed ip from {} to {}", - entry.getKey(), currentMasterAddr.getAddress().getHostAddress(), newMasterAddr.getAddress().getHostAddress()); - MasterSlaveEntry masterSlaveEntry = connectionManager.getEntry(currentMasterAddr); - if (masterSlaveEntry == null) { - log.error("Unable to find master entry for {}", currentMasterAddr); - return; - } - masterSlaveEntry.changeMaster(newMasterAddr, entry.getKey()); - masters.put(entry.getKey(), newMasterAddr); - } + }, dnsMonitoringInterval, TimeUnit.MILLISECONDS); + } + + private void monitorMasters(final AtomicInteger counter) { + for (final Entry entry : masters.entrySet()) { + Future resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(entry.getKey().getHost(), entry.getKey().getPort())); + resolveFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (counter.decrementAndGet() == 0) { + monitorDnsChange(); + } + + if (!future.isSuccess()) { + log.error("Unable to resolve " + entry.getKey().getHost(), future.cause()); + return; + } + + InetSocketAddress currentMasterAddr = entry.getValue(); + InetSocketAddress newMasterAddr = future.getNow(); + if (!newMasterAddr.getAddress().equals(currentMasterAddr.getAddress())) { + log.info("Detected DNS change. Master {} has changed ip from {} to {}", + entry.getKey(), currentMasterAddr.getAddress().getHostAddress(), newMasterAddr.getAddress().getHostAddress()); + MasterSlaveEntry masterSlaveEntry = connectionManager.getEntry(currentMasterAddr); + if (masterSlaveEntry == null) { + log.error("Unable to find master entry for {}", currentMasterAddr); + return; } - }); + masterSlaveEntry.changeMaster(newMasterAddr, entry.getKey()); + masters.put(entry.getKey(), newMasterAddr); + } } - - for (final Entry entry : slaves.entrySet()) { - Future resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(entry.getKey().getHost(), entry.getKey().getPort())); - resolveFuture.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (counter.decrementAndGet() == 0) { - monitorDnsChange(); - } + }); + } + } - if (!future.isSuccess()) { - log.error("Unable to resolve " + entry.getKey().getHost(), future.cause()); - return; + private void monitorSlaves(final AtomicInteger counter) { + for (final Entry entry : slaves.entrySet()) { + Future resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(entry.getKey().getHost(), entry.getKey().getPort())); + resolveFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (counter.decrementAndGet() == 0) { + monitorDnsChange(); + } + + if (!future.isSuccess()) { + log.error("Unable to resolve " + entry.getKey().getHost(), future.cause()); + return; + } + + final InetSocketAddress currentSlaveAddr = entry.getValue(); + final InetSocketAddress newSlaveAddr = future.getNow(); + if (!newSlaveAddr.getAddress().equals(currentSlaveAddr.getAddress())) { + log.info("Detected DNS change. Slave {} has changed ip from {} to {}", + entry.getKey().getHost(), currentSlaveAddr.getAddress().getHostAddress(), newSlaveAddr.getAddress().getHostAddress()); + for (final MasterSlaveEntry masterSlaveEntry : connectionManager.getEntrySet()) { + if (!masterSlaveEntry.hasSlave(currentSlaveAddr)) { + continue; } - final InetSocketAddress currentSlaveAddr = entry.getValue(); - final InetSocketAddress newSlaveAddr = future.getNow(); - if (!newSlaveAddr.getAddress().equals(currentSlaveAddr.getAddress())) { - log.info("Detected DNS change. Slave {} has changed ip from {} to {}", - entry.getKey().getHost(), currentSlaveAddr.getAddress().getHostAddress(), newSlaveAddr.getAddress().getHostAddress()); - for (final MasterSlaveEntry masterSlaveEntry : connectionManager.getEntrySet()) { - if (!masterSlaveEntry.hasSlave(currentSlaveAddr)) { - continue; - } - - if (masterSlaveEntry.hasSlave(newSlaveAddr)) { - masterSlaveEntry.slaveUp(newSlaveAddr, FreezeReason.MANAGER); + if (masterSlaveEntry.hasSlave(newSlaveAddr)) { + masterSlaveEntry.slaveUp(newSlaveAddr, FreezeReason.MANAGER); + masterSlaveEntry.slaveDown(currentSlaveAddr, FreezeReason.MANAGER); + } else { + RFuture addFuture = masterSlaveEntry.addSlave(newSlaveAddr, entry.getKey()); + addFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + log.error("Can't add slave: " + newSlaveAddr, future.cause()); + return; + } + masterSlaveEntry.slaveDown(currentSlaveAddr, FreezeReason.MANAGER); - } else { - RFuture addFuture = masterSlaveEntry.addSlave(newSlaveAddr, entry.getKey()); - addFuture.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - log.error("Can't add slave: " + newSlaveAddr, future.cause()); - return; - } - - masterSlaveEntry.slaveDown(currentSlaveAddr, FreezeReason.MANAGER); - } - }); } - break; - } - slaves.put(entry.getKey(), newSlaveAddr); + }); } + break; } - }); + slaves.put(entry.getKey(), newSlaveAddr); + } } - } - - }, dnsMonitoringInterval, TimeUnit.MILLISECONDS); + }); + } } diff --git a/redisson/src/test/java/org/redisson/RedissonStreamTest.java b/redisson/src/test/java/org/redisson/RedissonStreamTest.java index 924538652..8aa296928 100644 --- a/redisson/src/test/java/org/redisson/RedissonStreamTest.java +++ b/redisson/src/test/java/org/redisson/RedissonStreamTest.java @@ -12,10 +12,79 @@ import org.junit.Test; import org.redisson.api.PendingEntry; import org.redisson.api.PendingResult; import org.redisson.api.RStream; -import org.redisson.api.StreamId; +import org.redisson.api.StreamMessageId; +import org.redisson.client.RedisException; public class RedissonStreamTest extends BaseTest { + @Test + public void testUpdateGroupMessageId() { + RStream stream = redisson.getStream("test"); + + StreamMessageId id = stream.add("0", "0"); + + stream.createGroup("testGroup"); + + StreamMessageId id1 = stream.add("1", "1"); + System.out.println("id1 " + id1); + StreamMessageId id2 = stream.add("2", "2"); + System.out.println("id2 " + id2); + + Map> s = stream.readGroup("testGroup", "consumer1"); + assertThat(s.size()).isEqualTo(2); + + stream.updateGroupMessageId("testGroup", id); + + Map> s2 = stream.readGroup("testGroup", "consumer2"); + assertThat(s2.size()).isEqualTo(2); + } + + @Test + public void testRemoveConsumer() { + RStream stream = redisson.getStream("test"); + + stream.add("0", "0"); + + stream.createGroup("testGroup"); + + StreamMessageId id1 = stream.add("1", "1"); + StreamMessageId id2 = stream.add("2", "2"); + + Map> s = stream.readGroup("testGroup", "consumer1"); + assertThat(s.size()).isEqualTo(2); + + assertThat(stream.removeConsumer("testGroup", "consumer1")).isEqualTo(2); + assertThat(stream.removeConsumer("testGroup", "consumer2")).isZero(); + } + + @Test(expected = RedisException.class) + public void testRemoveGroup() { + RStream stream = redisson.getStream("test"); + + stream.add("0", "0"); + + stream.createGroup("testGroup"); + + StreamMessageId id1 = stream.add("1", "1"); + StreamMessageId id2 = stream.add("2", "2"); + + stream.removeGroup("testGroup"); + + stream.readGroup("testGroup", "consumer1"); + } + + @Test + public void testRemoveMessages() { + RStream stream = redisson.getStream("test"); + + StreamMessageId id1 = stream.add("0", "0"); + StreamMessageId id2 = stream.add("1", "1"); + assertThat(stream.size()).isEqualTo(2); + + assertThat(stream.remove(id1, id2)).isEqualTo(2); + assertThat(stream.size()).isZero(); + } + @Test public void testClaim() { RStream stream = redisson.getStream("test"); @@ -24,19 +93,19 @@ public class RedissonStreamTest extends BaseTest { stream.createGroup("testGroup"); - StreamId id1 = stream.add("1", "1"); - StreamId id2 = stream.add("2", "2"); + StreamMessageId id1 = stream.add("1", "1"); + StreamMessageId id2 = stream.add("2", "2"); - Map> s = stream.readGroup("testGroup", "consumer1"); + Map> s = stream.readGroup("testGroup", "consumer1"); assertThat(s.size()).isEqualTo(2); - StreamId id3 = stream.add("3", "33"); - StreamId id4 = stream.add("4", "44"); + StreamMessageId id3 = stream.add("3", "33"); + StreamMessageId id4 = stream.add("4", "44"); - Map> s2 = stream.readGroup("testGroup", "consumer2"); + Map> s2 = stream.readGroup("testGroup", "consumer2"); assertThat(s2.size()).isEqualTo(2); - Map> res = stream.claimPending("testGroup", "consumer1", 1, TimeUnit.MILLISECONDS, id3, id4); + Map> res = stream.claim("testGroup", "consumer1", 1, TimeUnit.MILLISECONDS, id3, id4); assertThat(res.size()).isEqualTo(2); assertThat(res.keySet()).containsExactly(id3, id4); for (Map map : res.values()) { @@ -45,6 +114,31 @@ public class RedissonStreamTest extends BaseTest { } } + @Test + public void testClaimIds() { + RStream stream = redisson.getStream("test"); + + stream.add("0", "0"); + + stream.createGroup("testGroup"); + + StreamMessageId id1 = stream.add("1", "1"); + StreamMessageId id2 = stream.add("2", "2"); + + Map> s = stream.readGroup("testGroup", "consumer1"); + assertThat(s.size()).isEqualTo(2); + + StreamMessageId id3 = stream.add("3", "33"); + StreamMessageId id4 = stream.add("4", "44"); + + Map> s2 = stream.readGroup("testGroup", "consumer2"); + assertThat(s2.size()).isEqualTo(2); + + List res = stream.fastClaim("testGroup", "consumer1", 1, TimeUnit.MILLISECONDS, id3, id4); + assertThat(res.size()).isEqualTo(2); + assertThat(res).containsExactly(id3, id4); + } + @Test public void testPending() { RStream stream = redisson.getStream("test"); @@ -53,16 +147,16 @@ public class RedissonStreamTest extends BaseTest { stream.createGroup("testGroup"); - StreamId id1 = stream.add("1", "1"); - StreamId id2 = stream.add("2", "2"); + StreamMessageId id1 = stream.add("1", "1"); + StreamMessageId id2 = stream.add("2", "2"); - Map> s = stream.readGroup("testGroup", "consumer1"); + Map> s = stream.readGroup("testGroup", "consumer1"); assertThat(s.size()).isEqualTo(2); - StreamId id3 = stream.add("3", "3"); - StreamId id4 = stream.add("4", "4"); + StreamMessageId id3 = stream.add("3", "3"); + StreamMessageId id4 = stream.add("4", "4"); - Map> s2 = stream.readGroup("testGroup", "consumer2"); + Map> s2 = stream.readGroup("testGroup", "consumer2"); assertThat(s2.size()).isEqualTo(2); PendingResult pi = stream.listPending("testGroup"); @@ -71,7 +165,7 @@ public class RedissonStreamTest extends BaseTest { assertThat(pi.getTotal()).isEqualTo(4); assertThat(pi.getConsumerNames().keySet()).containsExactly("consumer1", "consumer2"); - List list = stream.listPending("testGroup", StreamId.MIN, StreamId.MAX, 10); + List list = stream.listPending("testGroup", StreamMessageId.MIN, StreamMessageId.MAX, 10); assertThat(list.size()).isEqualTo(4); for (PendingEntry pendingEntry : list) { assertThat(pendingEntry.getId()).isIn(id1, id2, id3, id4); @@ -79,7 +173,7 @@ public class RedissonStreamTest extends BaseTest { assertThat(pendingEntry.getLastTimeDelivered()).isOne(); } - List list2 = stream.listPending("testGroup", StreamId.MIN, StreamId.MAX, 10, "consumer1"); + List list2 = stream.listPending("testGroup", "consumer1", StreamMessageId.MIN, StreamMessageId.MAX, 10); assertThat(list2.size()).isEqualTo(2); for (PendingEntry pendingEntry : list2) { assertThat(pendingEntry.getId()).isIn(id1, id2); @@ -96,10 +190,10 @@ public class RedissonStreamTest extends BaseTest { stream.createGroup("testGroup"); - StreamId id1 = stream.add("1", "1"); - StreamId id2 = stream.add("2", "2"); + StreamMessageId id1 = stream.add("1", "1"); + StreamMessageId id2 = stream.add("2", "2"); - Map> s = stream.readGroup("testGroup", "consumer1"); + Map> s = stream.readGroup("testGroup", "consumer1"); assertThat(s.size()).isEqualTo(2); assertThat(stream.ack("testGroup", id1, id2)).isEqualTo(2); @@ -109,7 +203,7 @@ public class RedissonStreamTest extends BaseTest { public void testReadGroup() { RStream stream = redisson.getStream("test"); - StreamId id0 = stream.add("0", "0"); + StreamMessageId id0 = stream.add("0", "0"); stream.createGroup("testGroup", id0); @@ -117,7 +211,7 @@ public class RedissonStreamTest extends BaseTest { stream.add("2", "2"); stream.add("3", "3"); - Map> s = stream.readGroup("testGroup", "consumer1"); + Map> s = stream.readGroup("testGroup", "consumer1"); assertThat(s.values().iterator().next().keySet()).containsAnyOf("1", "2", "3"); assertThat(s.size()).isEqualTo(3); @@ -125,14 +219,14 @@ public class RedissonStreamTest extends BaseTest { stream.add("2", "2"); stream.add("3", "3"); - Map> s1 = stream.readGroup("testGroup", "consumer1", 1); + Map> s1 = stream.readGroup("testGroup", "consumer1", 1); assertThat(s1.size()).isEqualTo(1); - StreamId id = stream.add("1", "1"); + StreamMessageId id = stream.add("1", "1"); stream.add("2", "2"); stream.add("3", "3"); - Map> s2 = stream.readGroup("testGroup", "consumer1", id); + Map> s2 = stream.readGroup("testGroup", "consumer1", id); assertThat(s2.size()).isEqualTo(2); } @@ -144,18 +238,18 @@ public class RedissonStreamTest extends BaseTest { Map entries1 = new HashMap<>(); entries1.put("1", "11"); entries1.put("3", "31"); - stream.addAll(new StreamId(1), entries1, 1, false); + stream.addAll(new StreamMessageId(1), entries1, 1, false); assertThat(stream.size()).isEqualTo(1); Map entries2 = new HashMap<>(); entries2.put("5", "55"); entries2.put("7", "77"); - stream.addAll(new StreamId(2), entries2, 1, false); + stream.addAll(new StreamMessageId(2), entries2, 1, false); - Map> r2 = stream.rangeReversed(10, StreamId.MAX, StreamId.MIN); - assertThat(r2.keySet()).containsExactly(new StreamId(2), new StreamId(1)); - assertThat(r2.get(new StreamId(1))).isEqualTo(entries1); - assertThat(r2.get(new StreamId(2))).isEqualTo(entries2); + Map> r2 = stream.rangeReversed(10, StreamMessageId.MAX, StreamMessageId.MIN); + assertThat(r2.keySet()).containsExactly(new StreamMessageId(2), new StreamMessageId(1)); + assertThat(r2.get(new StreamMessageId(1))).isEqualTo(entries1); + assertThat(r2.get(new StreamMessageId(2))).isEqualTo(entries2); } @Test @@ -166,22 +260,22 @@ public class RedissonStreamTest extends BaseTest { Map entries1 = new HashMap<>(); entries1.put("1", "11"); entries1.put("3", "31"); - stream.addAll(new StreamId(1), entries1, 1, false); + stream.addAll(new StreamMessageId(1), entries1, 1, false); assertThat(stream.size()).isEqualTo(1); Map entries2 = new HashMap<>(); entries2.put("5", "55"); entries2.put("7", "77"); - stream.addAll(new StreamId(2), entries2, 1, false); + stream.addAll(new StreamMessageId(2), entries2, 1, false); - Map> r = stream.range(10, new StreamId(0), new StreamId(1)); + Map> r = stream.range(10, new StreamMessageId(0), new StreamMessageId(1)); assertThat(r).hasSize(1); - assertThat(r.get(new StreamId(1))).isEqualTo(entries1); + assertThat(r.get(new StreamMessageId(1))).isEqualTo(entries1); - Map> r2 = stream.range(10, StreamId.MIN, StreamId.MAX); - assertThat(r2.keySet()).containsExactly(new StreamId(1), new StreamId(2)); - assertThat(r2.get(new StreamId(1))).isEqualTo(entries1); - assertThat(r2.get(new StreamId(2))).isEqualTo(entries2); + Map> r2 = stream.range(10, StreamMessageId.MIN, StreamMessageId.MAX); + assertThat(r2.keySet()).containsExactly(new StreamMessageId(1), new StreamMessageId(2)); + assertThat(r2.get(new StreamMessageId(1))).isEqualTo(entries1); + assertThat(r2.get(new StreamMessageId(2))).isEqualTo(entries2); } @Test @@ -201,16 +295,16 @@ public class RedissonStreamTest extends BaseTest { e.printStackTrace(); } - stream.addAll(new StreamId(1), entries1); + stream.addAll(new StreamMessageId(1), entries1); } }; t.start(); long start = System.currentTimeMillis(); - Map>> s = stream.read(2, 5, TimeUnit.SECONDS, new StreamId(0), "test1", StreamId.NEWEST); + Map>> s = stream.read(2, 5, TimeUnit.SECONDS, new StreamMessageId(0), "test1", StreamMessageId.NEWEST); assertThat(System.currentTimeMillis() - start).isBetween(1900L, 2200L); assertThat(s).hasSize(1); - assertThat(s.get("test").get(new StreamId(1))).isEqualTo(entries1); + assertThat(s.get("test").get(new StreamMessageId(1))).isEqualTo(entries1); } @Test @@ -230,16 +324,16 @@ public class RedissonStreamTest extends BaseTest { e.printStackTrace(); } - stream.addAll(new StreamId(1), entries1); + stream.addAll(new StreamMessageId(1), entries1); } }; t.start(); long start = System.currentTimeMillis(); - Map> s = stream.read(2, 5, TimeUnit.SECONDS, new StreamId(0)); + Map> s = stream.read(2, 5, TimeUnit.SECONDS, new StreamMessageId(0)); assertThat(System.currentTimeMillis() - start).isBetween(1900L, 2200L); assertThat(s).hasSize(1); - assertThat(s.get(new StreamId(1))).isEqualTo(entries1); + assertThat(s.get(new StreamMessageId(1))).isEqualTo(entries1); } @Test @@ -250,20 +344,20 @@ public class RedissonStreamTest extends BaseTest { Map entries1 = new HashMap<>(); entries1.put("1", "11"); entries1.put("3", "31"); - stream.addAll(new StreamId(1), entries1, 1, false); + stream.addAll(new StreamMessageId(1), entries1, 1, false); assertThat(stream.size()).isEqualTo(1); Map entries2 = new HashMap<>(); entries2.put("5", "55"); entries2.put("7", "77"); - stream.addAll(new StreamId(2), entries2, 1, false); + stream.addAll(new StreamMessageId(2), entries2, 1, false); assertThat(stream.size()).isEqualTo(2); } @Test public void testReadMultiKeysEmpty() { RStream stream = redisson.getStream("test2"); - Map>> s = stream.read(10, new StreamId(0), "test1", new StreamId(0)); + Map>> s = stream.read(10, new StreamMessageId(0), "test1", new StreamMessageId(0)); assertThat(s).isEmpty(); } @@ -282,7 +376,7 @@ public class RedissonStreamTest extends BaseTest { entries2.put("6", "66"); stream2.addAll(entries2); - Map>> s = stream2.read(10, new StreamId(0), "test1", new StreamId(0)); + Map>> s = stream2.read(10, new StreamMessageId(0), "test1", new StreamMessageId(0)); assertThat(s).hasSize(2); assertThat(s.get("test1").values().iterator().next()).isEqualTo(entries1); assertThat(s.get("test2").values().iterator().next()).isEqualTo(entries2); @@ -295,24 +389,24 @@ public class RedissonStreamTest extends BaseTest { Map entries1 = new LinkedHashMap<>(); entries1.put("1", "11"); entries1.put("3", "31"); - stream.addAll(new StreamId(1), entries1, 1, false); + stream.addAll(new StreamMessageId(1), entries1, 1, false); Map entries2 = new LinkedHashMap<>(); entries2.put("5", "55"); entries2.put("7", "77"); - stream.addAll(new StreamId(2), entries2, 1, false); + stream.addAll(new StreamMessageId(2), entries2, 1, false); Map entries3 = new LinkedHashMap<>(); entries3.put("15", "05"); entries3.put("17", "07"); - stream.addAll(new StreamId(3), entries3, 1, false); + stream.addAll(new StreamMessageId(3), entries3, 1, false); - Map> result = stream.read(10, new StreamId(0, 0)); + Map> result = stream.read(10, new StreamMessageId(0, 0)); assertThat(result).hasSize(3); - assertThat(result.get(new StreamId(4))).isNull(); - assertThat(result.get(new StreamId(1))).isEqualTo(entries1); - assertThat(result.get(new StreamId(2))).isEqualTo(entries2); - assertThat(result.get(new StreamId(3))).isEqualTo(entries3); + assertThat(result.get(new StreamMessageId(4))).isNull(); + assertThat(result.get(new StreamMessageId(1))).isEqualTo(entries1); + assertThat(result.get(new StreamMessageId(2))).isEqualTo(entries2); + assertThat(result.get(new StreamMessageId(3))).isEqualTo(entries3); } @Test @@ -321,25 +415,25 @@ public class RedissonStreamTest extends BaseTest { Map entries1 = new LinkedHashMap<>(); entries1.put("1", "11"); entries1.put("3", "31"); - stream.addAll(new StreamId(1), entries1, 1, true); + stream.addAll(new StreamMessageId(1), entries1, 1, true); - Map> result = stream.read(10, new StreamId(0, 0)); + Map> result = stream.read(10, new StreamMessageId(0, 0)); assertThat(result).hasSize(1); - assertThat(result.get(new StreamId(4))).isNull(); - assertThat(result.get(new StreamId(1))).isEqualTo(entries1); + assertThat(result.get(new StreamMessageId(4))).isNull(); + assertThat(result.get(new StreamMessageId(1))).isEqualTo(entries1); } @Test public void testReadEmpty() { RStream stream2 = redisson.getStream("test"); - Map> result2 = stream2.read(10, new StreamId(0, 0)); + Map> result2 = stream2.read(10, new StreamMessageId(0, 0)); assertThat(result2).isEmpty(); } @Test public void testAdd() { RStream stream = redisson.getStream("test1"); - StreamId s = stream.add("12", "33"); + StreamMessageId s = stream.add("12", "33"); assertThat(s.getId0()).isNotNegative(); assertThat(s.getId1()).isNotNegative(); assertThat(stream.size()).isEqualTo(1); @@ -353,13 +447,13 @@ public class RedissonStreamTest extends BaseTest { Map entries = new HashMap<>(); entries.put("6", "61"); entries.put("4", "41"); - stream.addAll(new StreamId(12, 42), entries, 10, false); + stream.addAll(new StreamMessageId(12, 42), entries, 10, false); assertThat(stream.size()).isEqualTo(1); entries.clear(); entries.put("1", "11"); entries.put("3", "31"); - stream.addAll(new StreamId(Long.MAX_VALUE), entries, 1, false); + stream.addAll(new StreamMessageId(Long.MAX_VALUE), entries, 1, false); assertThat(stream.size()).isEqualTo(2); }