diff --git a/redisson/src/main/java/org/redisson/RedissonStream.java b/redisson/src/main/java/org/redisson/RedissonStream.java index eb383091d..a76490d10 100644 --- a/redisson/src/main/java/org/redisson/RedissonStream.java +++ b/redisson/src/main/java/org/redisson/RedissonStream.java @@ -225,6 +225,185 @@ public class RedissonStream extends RedissonExpirable implements RStream>> readGroup(String groupName, String consumerName, StreamMessageId id, Map keyToId) { + return readGroup(groupName, consumerName, 0, id, keyToId); + } + + @Override + public RFuture>>> readGroupAsync(String groupName, String consumerName,StreamMessageId id, Map keyToId) { + return readGroupAsync(groupName, consumerName, 0, id, keyToId); + } + + @Override + public Map>> readGroup(String groupName, String consumerName, int count, StreamMessageId id, Map keyToId) { + return get(readGroupAsync(groupName, consumerName, count, id, keyToId)); + } + + @Override + public RFuture>>> readGroupAsync(String groupName, String consumerName, int count, StreamMessageId id, Map keyToId) { + return readGroupAsync(groupName, consumerName, count, -1, null, id, keyToId); + } + + @Override + public RFuture>>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id, + String key2, StreamMessageId id2) { + return readGroupAsync(groupName, consumerName, count, timeout, unit, id, Collections.singletonMap(key2, id2)); + } + + @Override + public RFuture>>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id, + String key2, StreamMessageId id2, String key3, StreamMessageId id3) { + Map params = new HashMap(2); + params.put(key2, id2); + params.put(key3, id3); + return readGroupAsync(groupName, consumerName, count, timeout, unit, id, params); + } + + @Override + public RFuture>>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id, Map keyToId) { + return readGroupAsync(groupName, consumerName, 0, timeout, unit, id, keyToId); + } + + @Override + public Map>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id, Map keyToId) { + return get(readGroupAsync(groupName, consumerName, count, timeout, unit, id, keyToId)); + } + + @Override + public Map>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id, Map keyToId) { + return readGroup(groupName, consumerName, 0, timeout, unit, id, keyToId); + } + + @Override + public RFuture>>> readGroupAsync(String groupName, String consumerName, StreamMessageId id, String key2, StreamMessageId id2) { + return readGroupAsync(groupName, consumerName, id, Collections.singletonMap(key2, id2)); + } + + @Override + public RFuture>>> readGroupAsync(String groupName, String consumerName, StreamMessageId id, String key2, StreamMessageId id2, String key3, + StreamMessageId id3) { + Map params = new HashMap(2); + params.put(key2, id2); + params.put(key3, id3); + return readGroupAsync(groupName, consumerName, id, params); + } + + @Override + public RFuture>>> readGroupAsync(String groupName, String consumerName, int count, StreamMessageId id, String key2, StreamMessageId id2) { + return readGroupAsync(groupName, consumerName, count, id, Collections.singletonMap(key2, id2)); + } + + @Override + public RFuture>>> readGroupAsync(String groupName, String consumerName, int count, StreamMessageId id, String key2, StreamMessageId id2, + String key3, StreamMessageId id3) { + Map params = new HashMap(2); + params.put(key2, id2); + params.put(key3, id3); + return readGroupAsync(groupName, consumerName, count, id, params); + } + + @Override + public RFuture>>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id, + String key2, StreamMessageId id2) { + return readGroupAsync(groupName, consumerName, timeout, unit, id, Collections.singletonMap(key2, id2)); + } + + @Override + public RFuture>>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id, + String key2, StreamMessageId id2, String key3, StreamMessageId id3) { + Map params = new HashMap(2); + params.put(key2, id2); + params.put(key3, id3); + return readGroupAsync(groupName, consumerName, timeout, unit, id, params); + } + + @Override + public Map>> readGroup(String groupName, String consumerName, StreamMessageId id, String key2, StreamMessageId id2) { + return get(readGroupAsync(groupName, consumerName, id, key2, id2)); + } + + @Override + public Map>> readGroup(String groupName, String consumerName, StreamMessageId id, String key2, StreamMessageId id2, String key3, + StreamMessageId id3) { + return get(readGroupAsync(groupName, consumerName, id, key2, id2, key3, id3)); + } + + @Override + public Map>> readGroup(String groupName, String consumerName, int count, StreamMessageId id, String key2, StreamMessageId id2) { + return get(readGroupAsync(groupName, consumerName, count, id, key2, id2)); + } + + @Override + public Map>> readGroup(String groupName, String consumerName, int count, StreamMessageId id, String key2, StreamMessageId id2, String key3, + StreamMessageId id3) { + return get(readGroupAsync(groupName, consumerName, count, id, key2, id2, key3, id3)); + } + + @Override + public Map>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id, String key2, + StreamMessageId id2) { + return get(readGroupAsync(groupName, consumerName, timeout, unit, id, key2, id2)); + } + + @Override + public Map>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id, String key2, + StreamMessageId id2, String key3, StreamMessageId id3) { + return get(readGroupAsync(groupName, consumerName, timeout, unit, id, key2, id2, key3, id3)); + } + + @Override + public Map>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id, String key2, + StreamMessageId id2) { + return get(readGroupAsync(groupName, consumerName, count, timeout, unit, id, key2, id2)); + } + + @Override + public Map>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id, String key2, + StreamMessageId id2, String key3, StreamMessageId id3) { + return get(readGroupAsync(groupName, consumerName, count, timeout, unit, id, key2, id2, key3, id3)); + } + + public RFuture>>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit, + StreamMessageId id, Map keyToId) { + List params = new ArrayList(); + params.add("GROUP"); + params.add(groupName); + params.add(consumerName); + + if (count > 0) { + params.add("COUNT"); + params.add(count); + } + + if (timeout > 0) { + params.add("BLOCK"); + params.add(toSeconds(timeout, unit)*1000); + } + + params.add("STREAMS"); + params.add(getName()); + for (String key : keyToId.keySet()) { + params.add(key); + } + + if (id == null) { + params.add(">"); + } else { + params.add(id); + } + + for (StreamMessageId nextId : keyToId.values()) { + params.add(nextId.toString()); + } + + if (timeout > 0) { + return commandExecutor.readAsync(getName(), codec, RedisCommands.XREADGROUP_BLOCKING, params.toArray()); + } + return commandExecutor.readAsync(getName(), codec, RedisCommands.XREADGROUP, params.toArray()); + } + @Override public Map> readGroup(String groupName, String consumerName, StreamMessageId... ids) { @@ -330,8 +509,18 @@ public class RedissonStream extends RedissonExpirable implements RStream> read(int count, StreamMessageId ... ids) { - return get(readAsync(count, ids)); + public Map>> read(StreamMessageId id, Map keyToId) { + return read(0, id, keyToId); + } + + @Override + public RFuture>>> readAsync(StreamMessageId id, Map keyToId) { + return readAsync(0, id, keyToId); + } + + @Override + public Map>> read(int count, StreamMessageId id, Map keyToId) { + return get(readAsync(count, id, keyToId)); } @Override @@ -340,10 +529,124 @@ public class RedissonStream extends RedissonExpirable implements RStream> read(int count, long timeout, TimeUnit unit, StreamMessageId... ids) { - return get(readAsync(count, timeout, unit, ids)); + public RFuture>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id, + String key2, StreamMessageId id2) { + return readAsync(count, timeout, unit, id, Collections.singletonMap(key2, id2)); } + @Override + public RFuture>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id, + String key2, StreamMessageId id2, String key3, StreamMessageId id3) { + Map params = new HashMap(2); + params.put(key2, id2); + params.put(key3, id3); + return readAsync(count, timeout, unit, id, params); + } + + @Override + public RFuture>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id, Map keyToId) { + return readAsync(0, timeout, unit, id, keyToId); + } + + @Override + public Map>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, Map keyToId) { + return get(readAsync(count, timeout, unit, id, keyToId)); + } + + @Override + public Map>> read(long timeout, TimeUnit unit, StreamMessageId id, Map keyToId) { + return read(0, timeout, unit, id, keyToId); + } + + @Override + public RFuture>>> readAsync(StreamMessageId id, String key2, StreamMessageId id2) { + return readAsync(id, Collections.singletonMap(key2, id2)); + } + + @Override + public RFuture>>> readAsync(StreamMessageId id, String key2, StreamMessageId id2, String key3, + StreamMessageId id3) { + Map params = new HashMap(2); + params.put(key2, id2); + params.put(key3, id3); + return readAsync(id, params); + } + + @Override + public RFuture>>> readAsync(int count, StreamMessageId id, String key2, StreamMessageId id2) { + return readAsync(count, id, Collections.singletonMap(key2, id2)); + } + + @Override + public RFuture>>> readAsync(int count, StreamMessageId id, String key2, StreamMessageId id2, + String key3, StreamMessageId id3) { + Map params = new HashMap(2); + params.put(key2, id2); + params.put(key3, id3); + return readAsync(count, id, params); + } + + @Override + public RFuture>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id, + String key2, StreamMessageId id2) { + return readAsync(timeout, unit, id, Collections.singletonMap(key2, id2)); + } + + @Override + public RFuture>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id, + String key2, StreamMessageId id2, String key3, StreamMessageId id3) { + Map params = new HashMap(2); + params.put(key2, id2); + params.put(key3, id3); + return readAsync(timeout, unit, id, params); + } + + @Override + public Map>> read(StreamMessageId id, String key2, StreamMessageId id2) { + return get(readAsync(id, key2, id2)); + } + + @Override + public Map>> read(StreamMessageId id, String key2, StreamMessageId id2, String key3, + StreamMessageId id3) { + return get(readAsync(id, key2, id2, key3, id3)); + } + + @Override + public Map>> read(int count, StreamMessageId id, String key2, StreamMessageId id2) { + return get(readAsync(count, id, key2, id2)); + } + + @Override + public Map>> read(int count, StreamMessageId id, String key2, StreamMessageId id2, String key3, + StreamMessageId id3) { + return get(readAsync(count, id, key2, id2, key3, id3)); + } + + @Override + public Map>> read(long timeout, TimeUnit unit, StreamMessageId id, String key2, + StreamMessageId id2) { + return get(readAsync(timeout, unit, id, key2, id2)); + } + + @Override + public Map>> read(long timeout, TimeUnit unit, StreamMessageId id, String key2, + StreamMessageId id2, String key3, StreamMessageId id3) { + return get(readAsync(timeout, unit, id, key2, id2, key3, id3)); + } + + @Override + public Map>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String key2, + StreamMessageId id2) { + return get(readAsync(count, timeout, unit, id, key2, id2)); + } + + @Override + public Map>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String key2, + StreamMessageId id2, String key3, StreamMessageId id3) { + return get(readAsync(count, timeout, unit, id, key2, id2, key3, id3)); + } + @Override public RFuture>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id, Map keyToId) { List params = new ArrayList(); @@ -448,6 +751,16 @@ public class RedissonStream extends RedissonExpirable implements RStream>> readAsync(int count, StreamMessageId... ids) { return readAsync(count, 0, null, ids); } + + @Override + public Map> read(int count, long timeout, TimeUnit unit, StreamMessageId... ids) { + return get(readAsync(count, timeout, unit, ids)); + } + + @Override + public Map> read(int count, StreamMessageId ... ids) { + return get(readAsync(count, ids)); + } @Override public RFuture>> readAsync(int count, long timeout, TimeUnit unit, @@ -476,16 +789,6 @@ public class RedissonStream extends RedissonExpirable implements RStream>> read(int count, StreamMessageId id, Map keyToId) { - return get(readAsync(count, id, keyToId)); - } - - @Override - public Map>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, Map keyToId) { - return get(readAsync(count, timeout, unit, id, keyToId)); - } - @Override public RFuture>> rangeAsync(int count, StreamMessageId startId, StreamMessageId endId) { List params = new LinkedList(); @@ -536,16 +839,6 @@ public class RedissonStream extends RedissonExpirable implements RStream>>> readAsync(StreamMessageId id, Map keyToId) { - return readAsync(0, id, keyToId); - } - - @Override - public RFuture>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id, Map keyToId) { - return readAsync(0, timeout, unit, id, keyToId); - } - @Override public RFuture>> rangeAsync(StreamMessageId startId, StreamMessageId endId) { return rangeAsync(0, startId, endId); @@ -566,16 +859,6 @@ public class RedissonStream extends RedissonExpirable implements RStream>> read(StreamMessageId id, Map keyToId) { - return read(0, id, keyToId); - } - - @Override - public Map>> read(long timeout, TimeUnit unit, StreamMessageId id, Map keyToId) { - return read(0, timeout, unit, id, keyToId); - } - @Override public Map> range(StreamMessageId startId, StreamMessageId endId) { return range(0, startId, endId); @@ -586,110 +869,6 @@ public class RedissonStream extends RedissonExpirable implements RStream>>> readAsync(StreamMessageId id, String key2, StreamMessageId id2) { - return readAsync(id, Collections.singletonMap(key2, id2)); - } - - @Override - public RFuture>>> readAsync(StreamMessageId id, String key2, StreamMessageId id2, String key3, - StreamMessageId id3) { - Map params = new HashMap(2); - params.put(key2, id2); - params.put(key3, id3); - return readAsync(id, params); - } - - @Override - public RFuture>>> readAsync(int count, StreamMessageId id, String key2, StreamMessageId id2) { - return readAsync(count, id, Collections.singletonMap(key2, id2)); - } - - @Override - public RFuture>>> readAsync(int count, StreamMessageId id, String key2, StreamMessageId id2, - String key3, StreamMessageId id3) { - Map params = new HashMap(2); - params.put(key2, id2); - params.put(key3, id3); - return readAsync(count, id, params); - } - - @Override - public RFuture>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id, - String key2, StreamMessageId id2) { - return readAsync(timeout, unit, id, Collections.singletonMap(key2, id2)); - } - - @Override - public RFuture>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id, - String key2, StreamMessageId id2, String key3, StreamMessageId id3) { - Map params = new HashMap(2); - params.put(key2, id2); - params.put(key3, id3); - return readAsync(timeout, unit, id, params); - } - - @Override - public RFuture>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id, - String key2, StreamMessageId id2) { - return readAsync(count, timeout, unit, id, Collections.singletonMap(key2, id2)); - } - - @Override - public RFuture>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id, - String key2, StreamMessageId id2, String key3, StreamMessageId id3) { - Map params = new HashMap(2); - params.put(key2, id2); - params.put(key3, id3); - return readAsync(count, timeout, unit, id, params); - } - - @Override - public Map>> read(StreamMessageId id, String key2, StreamMessageId id2) { - return get(readAsync(id, key2, id2)); - } - - @Override - public Map>> read(StreamMessageId id, String key2, StreamMessageId id2, String key3, - StreamMessageId id3) { - return get(readAsync(id, key2, id2, key3, id3)); - } - - @Override - public Map>> read(int count, StreamMessageId id, String key2, StreamMessageId id2) { - return get(readAsync(count, id, key2, id2)); - } - - @Override - public Map>> read(int count, StreamMessageId id, String key2, StreamMessageId id2, String key3, - StreamMessageId id3) { - return get(readAsync(count, id, key2, id2, key3, id3)); - } - - @Override - public Map>> read(long timeout, TimeUnit unit, StreamMessageId id, String key2, - StreamMessageId id2) { - return get(readAsync(timeout, unit, id, key2, id2)); - } - - @Override - public Map>> read(long timeout, TimeUnit unit, StreamMessageId id, String key2, - StreamMessageId id2, String key3, StreamMessageId id3) { - return get(readAsync(timeout, unit, id, key2, id2, key3, id3)); - } - - @Override - public Map>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String key2, - StreamMessageId id2) { - return get(readAsync(count, timeout, unit, id, key2, id2)); - } - - @Override - public Map>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String key2, - StreamMessageId id2, String key3, StreamMessageId id3) { - return get(readAsync(count, timeout, unit, id, key2, id2, key3, id3)); - } - @Override public RFuture removeAsync(StreamMessageId... ids) { List params = new ArrayList(); diff --git a/redisson/src/main/java/org/redisson/api/RStream.java b/redisson/src/main/java/org/redisson/api/RStream.java index 27dc0ff5c..35f4b027f 100644 --- a/redisson/src/main/java/org/redisson/api/RStream.java +++ b/redisson/src/main/java/org/redisson/api/RStream.java @@ -170,7 +170,7 @@ public interface RStream extends RStreamAsync, RExpirable { /** * Read stream data from groupName by consumerName and specified collection of Stream Message IDs. - * Wait for stream data availability for specified timeout interval. + * Waits for stream data availability for specified timeout interval. * * @param groupName - name of group * @param consumerName - name of consumer @@ -183,7 +183,7 @@ public interface RStream extends RStreamAsync, RExpirable { /** * Read stream data from groupName by consumerName and specified collection of Stream Message IDs. - * Wait for stream data availability for specified timeout interval. + * Waits for stream data availability for specified timeout interval. * * @param groupName - name of group * @param consumerName - name of consumer @@ -194,6 +194,184 @@ public interface RStream extends RStreamAsync, RExpirable { * @return stream data mapped by Stream Message ID */ Map> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId ... ids); + + /** + * Read stream data from groupName by consumerName, starting by specified message ids for this and other streams. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @param id - starting message id for this stream + * @param nameToId - Stream Message ID mapped by stream name + * @return stream data mapped by key and Stream Message ID + */ + Map>> readGroup(String groupName, String consumerName, StreamMessageId id, Map nameToId); + + /** + * Read stream data from groupName by consumerName, starting by specified message ids for this and other streams. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @param count - stream data size limit + * @param id - starting message id for this stream + * @param nameToId - Stream Message ID mapped by stream name + * @return stream data mapped by key and Stream Message ID + */ + Map>> readGroup(String groupName, String consumerName, int count, StreamMessageId id, Map nameToId); + + /** + * Read stream data from groupName by consumerName, starting by specified message ids for this and other streams. + * Waits for the first stream data availability for specified timeout interval. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @param count - stream data size limit + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param id - starting message id for this stream + * @param nameToId - Stream Message ID mapped by stream name + * @return stream data mapped by key and Stream Message ID + */ + Map>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id, Map nameToId); + + /** + * Read stream data from groupName by consumerName, starting by specified message ids for this and other streams. + * Waits for the first stream data availability for specified timeout interval. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param id - starting message id for this stream + * @param nameToId - Stream Message ID mapped by stream name + * @return stream data mapped by key and Stream Message ID + */ + Map>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id, Map nameToId); + + /** + * Read stream data from groupName by consumerName, starting by specified message ids for this and other streams. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @param id - starting message id for this stream + * @param key2 - name of second stream + * @param id2 - starting message id for second stream + * @return stream data mapped by key and Stream Message ID + */ + Map>> readGroup(String groupName, String consumerName, StreamMessageId id, String key2, StreamMessageId id2); + + /** + * Read stream data from groupName by consumerName, starting by specified message ids for this and other streams. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @param id - starting message id for this stream + * @param key2 - name of second stream + * @param id2 - starting message id for second stream + * @param key3 - name of third stream + * @param id3 - starting message id for third stream + * @return stream data mapped by key and Stream Message ID + */ + Map>> readGroup(String groupName, String consumerName, StreamMessageId id, String key2, StreamMessageId id2, String key3, + StreamMessageId id3); + + /** + * Read stream data from groupName by consumerName, starting by specified message ids for this and other streams. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @param count - stream data size limit + * @param id - starting message id for this stream + * @param key2 - name of second stream + * @param id2 - starting message id for second stream + * @return stream data mapped by key and Stream Message ID + */ + Map>> readGroup(String groupName, String consumerName, int count, StreamMessageId id, String key2, StreamMessageId id2); + + /** + * Read stream data from groupName by consumerName, starting by specified message ids for this and other streams. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @param count - stream data size limit + * @param id - starting message id for this stream + * @param key2 - name of second stream + * @param id2 - starting message id for second stream + * @param key3 - name of third stream + * @param id3 - starting message id for third stream + * @return stream data mapped by key and Stream Message ID + */ + Map>> readGroup(String groupName, String consumerName, int count, StreamMessageId id, String key2, StreamMessageId id2, String key3, + StreamMessageId id3); + + /** + * Read stream data from groupName by consumerName, starting by specified message ids for this and other streams. + * Waits for the first stream data availability for specified timeout interval. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param id - starting message id for this stream + * @param key2 - name of second stream + * @param id2 - starting message id for second stream + * @return stream data mapped by key and Stream Message ID + */ + Map>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id, String key2, + StreamMessageId id2); + + /** + * Read stream data from groupName by consumerName, starting by specified message ids for this and other streams. + * Waits for the first stream data availability for specified timeout interval. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param id - starting message id for this stream + * @param key2 - name of second stream + * @param id2 - starting message id for second stream + * @param key3 - name of third stream + * @param id3 - starting message id for third stream + * @return stream data mapped by key and Stream Message ID + */ + Map>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id, String key2, + StreamMessageId id2, String key3, StreamMessageId id3); + + /** + * Read stream data from groupName by consumerName, starting by specified message ids for this and other streams. + * Waits for the first stream data availability for specified timeout interval. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @param count - stream data size limit + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param id - starting message id for this stream + * @param key2 - name of second stream + * @param id2 - starting message id for second stream + * @return stream data mapped by key and Stream Message ID + */ + Map>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id, String key2, + StreamMessageId id2); + + /** + * Read stream data from groupName by consumerName, starting by specified message ids for this and other streams. + * Waits for the first stream data availability for specified timeout interval. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @param count - stream data size limit + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param id - starting message id for this stream + * @param key2 - name of second stream + * @param id2 - starting message id for second stream + * @param key3 - name of third stream + * @param id3 - starting message id for third stream + * @return stream data mapped by key and Stream Message ID + */ + Map>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id, String key2, + StreamMessageId id2, String key3, StreamMessageId id3); /** * Returns number of entries in stream diff --git a/redisson/src/main/java/org/redisson/api/RStreamAsync.java b/redisson/src/main/java/org/redisson/api/RStreamAsync.java index fa85a3f6c..521704f75 100644 --- a/redisson/src/main/java/org/redisson/api/RStreamAsync.java +++ b/redisson/src/main/java/org/redisson/api/RStreamAsync.java @@ -199,6 +199,168 @@ public interface RStreamAsync extends RExpirableAsync { */ RFuture>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId ... ids); + /** + * Read stream data from groupName by consumerName, starting by specified message ids for this and other streams. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @param id - starting message id for this stream + * @param nameToId - Stream Message ID mapped by stream name + * @return stream data mapped by key and Stream Message ID + */ + RFuture>>> readGroupAsync(String groupName, String consumerName,StreamMessageId id, Map nameToId); + + /** + * Read stream data from groupName by consumerName, starting by specified message ids for this and other streams. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @param count - stream data size limit + * @param id - starting message id for this stream + * @param nameToId - Stream Message ID mapped by stream name + * @return stream data mapped by key and Stream Message ID + */ + RFuture>>> readGroupAsync(String groupName, String consumerName, int count, StreamMessageId id, Map nameToId); + + /** + * Read stream data from groupName by consumerName, starting by specified message ids for this and other streams. + * Waits for the first stream data availability for specified timeout interval. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @param count - stream data size limit + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param id - starting message id for this stream + * @param key2 - name of second stream + * @param id2 - starting message id for second stream + * @return stream data mapped by key and Stream Message ID + */ + RFuture>>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id, String key2, StreamMessageId id2); + + /** + * Read stream data from groupName by consumerName, starting by specified message ids for this and other streams. + * Waits for the first stream data availability for specified timeout interval. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @param count - stream data size limit + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param id - starting message id for this stream + * @param key2 - name of second stream + * @param id2 - starting message id for second stream + * @param key3 - name of third stream + * @param id3 - starting message id for third stream + * @return stream data mapped by key and Stream Message ID + */ + RFuture>>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id, + String key2, StreamMessageId id2, String key3, StreamMessageId id3); + + /** + * Read stream data from groupName by consumerName, starting by specified message ids for this and other streams. + * Waits for the first stream data availability for specified timeout interval. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param id - starting message id for this stream + * @param nameToId - Stream Message ID mapped by stream name + * @return stream data mapped by key and Stream Message ID + */ + RFuture>>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id, Map nameToId); + + /** + * Read stream data from groupName by consumerName, starting by specified message ids for this and other streams. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @param id - starting message id for this stream + * @param key2 - name of second stream + * @param id2 - starting message id for second stream + * @return stream data mapped by key and Stream Message ID + */ + RFuture>>> readGroupAsync(String groupName, String consumerName, StreamMessageId id, String key2, StreamMessageId id2); + + /** + * Read stream data from groupName by consumerName, starting by specified message ids for this and other streams. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @param id - starting message id for this stream + * @param key2 - name of second stream + * @param id2 - starting message id for second stream + * @param key3 - name of third stream + * @param id3 - starting message id for third stream + * @return stream data mapped by key and Stream Message ID + */ + RFuture>>> readGroupAsync(String groupName, String consumerName, StreamMessageId id, String key2, StreamMessageId id2, String key3, + StreamMessageId id3); + + /** + * Read stream data from groupName by consumerName, starting by specified message ids for this and other streams. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @param count - stream data size limit + * @param id - starting message id for this stream + * @param key2 - name of second stream + * @param id2 - starting message id for second stream + * @return stream data mapped by key and Stream Message ID + */ + RFuture>>> readGroupAsync(String groupName, String consumerName, int count, StreamMessageId id, String key2, StreamMessageId id2); + + /** + * Read stream data from groupName by consumerName, starting by specified message ids for this and other streams. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @param count - stream data size limit + * @param id - starting message id for this stream + * @param key2 - name of second stream + * @param id2 - starting message id for second stream + * @param key3 - name of third stream + * @param id3 - starting message id for third stream + * @return stream data mapped by key and Stream Message ID + */ + RFuture>>> readGroupAsync(String groupName, String consumerName, int count, StreamMessageId id, String key2, StreamMessageId id2, + String key3, StreamMessageId id3); + + /** + * Read stream data from groupName by consumerName, starting by specified message ids for this and other streams. + * Waits for the first stream data availability for specified timeout interval. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param id - starting message id for this stream + * @param key2 - name of second stream + * @param id2 - starting message id for second stream + * @return stream data mapped by key and Stream Message ID + */ + RFuture>>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id, + String key2, StreamMessageId id2); + + /** + * Read stream data from groupName by consumerName, starting by specified message ids for this and other streams. + * Waits for the first stream data availability for specified timeout interval. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param id - starting message id for this stream + * @param key2 - name of second stream + * @param id2 - starting message id for second stream + * @param key3 - name of third stream + * @param id3 - starting message id for third stream + * @return stream data mapped by key and Stream Message ID + */ + RFuture>>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id, + String key2, StreamMessageId id2, String key3, StreamMessageId id3); + /** * Returns number of entries in stream * diff --git a/redisson/src/main/java/org/redisson/api/RTopicRx.java b/redisson/src/main/java/org/redisson/api/RTopicRx.java index edaee5aa4..eedfbf723 100644 --- a/redisson/src/main/java/org/redisson/api/RTopicRx.java +++ b/redisson/src/main/java/org/redisson/api/RTopicRx.java @@ -77,6 +77,7 @@ public interface RTopicRx { /** * Returns stream of messages. * + * @param - type of message * @param type - type of message to listen * @return stream of messages */ diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index 31ddabb4f..078e10f30 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -333,8 +333,8 @@ public interface RedisCommands { new ObjectMapReplayDecoder(ListMultiDecoder.RESET), new ObjectMapJoinDecoder())); - RedisCommand>>> XREVRANGE = new RedisCommand>>>("XREVRANGE", - XRANGE.getReplayMultiDecoder()); + RedisCommand>>> XREVRANGE = + new RedisCommand>>>("XREVRANGE", XRANGE.getReplayMultiDecoder()); RedisCommand>>> XREAD = new RedisCommand>>>("XREAD", new ListMultiDecoder( @@ -358,7 +358,8 @@ public interface RedisCommands { new ObjectMapReplayDecoder(), new StreamResultDecoder())); - RedisCommand>> XREAD_BLOCKING_SINGLE = new RedisCommand>>("XREAD", XREAD_SINGLE.getReplayMultiDecoder()); + RedisCommand>> XREAD_BLOCKING_SINGLE = + new RedisCommand>>("XREAD", XREAD_SINGLE.getReplayMultiDecoder()); RedisCommand>>> XREADGROUP = new RedisCommand>>>("XREADGROUP", new ListMultiDecoder( @@ -370,15 +371,8 @@ public interface RedisCommands { new ObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX), new StreamResultDecoder())); - RedisCommand>> XREADGROUP_BLOCKING = new RedisCommand>>("XREADGROUP", - new ListMultiDecoder( - new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()), - new ObjectDecoder(new StreamIdDecoder()), - new ObjectMapReplayDecoder(), - new ObjectMapReplayDecoder(ListMultiDecoder.RESET_1), - new ObjectMapJoinDecoder(), - new ObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX), - new StreamResultDecoder())); + RedisCommand>>> XREADGROUP_BLOCKING = + new RedisCommand>>>("XREADGROUP", XREADGROUP.getReplayMultiDecoder()); RedisCommand>> XREADGROUP_SINGLE = new RedisCommand>>("XREADGROUP", new ListMultiDecoder( diff --git a/redisson/src/test/java/org/redisson/RedissonStreamTest.java b/redisson/src/test/java/org/redisson/RedissonStreamTest.java index 8aa296928..5032ba940 100644 --- a/redisson/src/test/java/org/redisson/RedissonStreamTest.java +++ b/redisson/src/test/java/org/redisson/RedissonStreamTest.java @@ -2,6 +2,7 @@ package org.redisson; import static org.assertj.core.api.Assertions.assertThat; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -199,6 +200,30 @@ public class RedissonStreamTest extends BaseTest { assertThat(stream.ack("testGroup", id1, id2)).isEqualTo(2); } + @Test + public void testReadGroupMulti() { + RStream stream1 = redisson.getStream("test1"); + RStream stream2 = redisson.getStream("test2"); + + StreamMessageId id01 = stream1.add("0", "0"); + StreamMessageId id02 = stream2.add("0", "0"); + + stream1.createGroup("testGroup", id01); + stream2.createGroup("testGroup", id02); + + StreamMessageId id11 = stream1.add("1", "1"); + StreamMessageId id12 = stream1.add("2", "2"); + StreamMessageId id13 = stream1.add("3", "3"); + StreamMessageId id21 = stream2.add("1", "1"); + StreamMessageId id22 = stream2.add("2", "2"); + StreamMessageId id23 = stream2.add("3", "3"); + + Map>> s2 = stream1.readGroup("testGroup", "consumer1", id11, Collections.singletonMap("test2", id21)); + assertThat(s2.keySet()).containsExactly("test1", "test2"); + assertThat(s2.get("test1").keySet()).containsExactly(id12, id13); + assertThat(s2.get("test2").keySet()).containsExactly(id22, id23); + } + @Test public void testReadGroup() { RStream stream = redisson.getStream("test");