Merge branch 'master' into 3.0.0

pull/1821/head
Nikita Koksharov 6 years ago
commit 45cf5027aa

@ -225,6 +225,185 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
return commandExecutor.readAsync(getName(), codec, RedisCommands.XREADGROUP_SINGLE, params.toArray());
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
return readGroup(groupName, consumerName, 0, id, keyToId);
}
@Override
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName,StreamMessageId id, Map<String, StreamMessageId> keyToId) {
return readGroupAsync(groupName, consumerName, 0, id, keyToId);
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
return get(readGroupAsync(groupName, consumerName, count, id, keyToId));
}
@Override
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, int count, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
return readGroupAsync(groupName, consumerName, count, -1, null, id, keyToId);
}
@Override
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id,
String key2, StreamMessageId id2) {
return readGroupAsync(groupName, consumerName, count, timeout, unit, id, Collections.singletonMap(key2, id2));
}
@Override
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id,
String key2, StreamMessageId id2, String key3, StreamMessageId id3) {
Map<String, StreamMessageId> params = new HashMap<String, StreamMessageId>(2);
params.put(key2, id2);
params.put(key3, id3);
return readGroupAsync(groupName, consumerName, count, timeout, unit, id, params);
}
@Override
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
return readGroupAsync(groupName, consumerName, 0, timeout, unit, id, keyToId);
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
return get(readGroupAsync(groupName, consumerName, count, timeout, unit, id, keyToId));
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
return readGroup(groupName, consumerName, 0, timeout, unit, id, keyToId);
}
@Override
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, StreamMessageId id, String key2, StreamMessageId id2) {
return readGroupAsync(groupName, consumerName, id, Collections.singletonMap(key2, id2));
}
@Override
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, StreamMessageId id, String key2, StreamMessageId id2, String key3,
StreamMessageId id3) {
Map<String, StreamMessageId> params = new HashMap<String, StreamMessageId>(2);
params.put(key2, id2);
params.put(key3, id3);
return readGroupAsync(groupName, consumerName, id, params);
}
@Override
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, int count, StreamMessageId id, String key2, StreamMessageId id2) {
return readGroupAsync(groupName, consumerName, count, id, Collections.singletonMap(key2, id2));
}
@Override
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, int count, StreamMessageId id, String key2, StreamMessageId id2,
String key3, StreamMessageId id3) {
Map<String, StreamMessageId> params = new HashMap<String, StreamMessageId>(2);
params.put(key2, id2);
params.put(key3, id3);
return readGroupAsync(groupName, consumerName, count, id, params);
}
@Override
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id,
String key2, StreamMessageId id2) {
return readGroupAsync(groupName, consumerName, timeout, unit, id, Collections.singletonMap(key2, id2));
}
@Override
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id,
String key2, StreamMessageId id2, String key3, StreamMessageId id3) {
Map<String, StreamMessageId> params = new HashMap<String, StreamMessageId>(2);
params.put(key2, id2);
params.put(key3, id3);
return readGroupAsync(groupName, consumerName, timeout, unit, id, params);
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, StreamMessageId id, String key2, StreamMessageId id2) {
return get(readGroupAsync(groupName, consumerName, id, key2, id2));
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, StreamMessageId id, String key2, StreamMessageId id2, String key3,
StreamMessageId id3) {
return get(readGroupAsync(groupName, consumerName, id, key2, id2, key3, id3));
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, StreamMessageId id, String key2, StreamMessageId id2) {
return get(readGroupAsync(groupName, consumerName, count, id, key2, id2));
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, StreamMessageId id, String key2, StreamMessageId id2, String key3,
StreamMessageId id3) {
return get(readGroupAsync(groupName, consumerName, count, id, key2, id2, key3, id3));
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id, String key2,
StreamMessageId id2) {
return get(readGroupAsync(groupName, consumerName, timeout, unit, id, key2, id2));
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id, String key2,
StreamMessageId id2, String key3, StreamMessageId id3) {
return get(readGroupAsync(groupName, consumerName, timeout, unit, id, key2, id2, key3, id3));
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id, String key2,
StreamMessageId id2) {
return get(readGroupAsync(groupName, consumerName, count, timeout, unit, id, key2, id2));
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id, String key2,
StreamMessageId id2, String key3, StreamMessageId id3) {
return get(readGroupAsync(groupName, consumerName, count, timeout, unit, id, key2, id2, key3, id3));
}
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit,
StreamMessageId id, Map<String, StreamMessageId> keyToId) {
List<Object> params = new ArrayList<Object>();
params.add("GROUP");
params.add(groupName);
params.add(consumerName);
if (count > 0) {
params.add("COUNT");
params.add(count);
}
if (timeout > 0) {
params.add("BLOCK");
params.add(toSeconds(timeout, unit)*1000);
}
params.add("STREAMS");
params.add(getName());
for (String key : keyToId.keySet()) {
params.add(key);
}
if (id == null) {
params.add(">");
} else {
params.add(id);
}
for (StreamMessageId nextId : keyToId.values()) {
params.add(nextId.toString());
}
if (timeout > 0) {
return commandExecutor.readAsync(getName(), codec, RedisCommands.XREADGROUP_BLOCKING, params.toArray());
}
return commandExecutor.readAsync(getName(), codec, RedisCommands.XREADGROUP, params.toArray());
}
@Override
public Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, StreamMessageId... ids) {
@ -330,8 +509,18 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
@Override
public Map<StreamMessageId, Map<K, V>> read(int count, StreamMessageId ... ids) {
return get(readAsync(count, ids));
public Map<String, Map<StreamMessageId, Map<K, V>>> read(StreamMessageId id, Map<String, StreamMessageId> keyToId) {
return read(0, id, keyToId);
}
@Override
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(StreamMessageId id, Map<String, StreamMessageId> keyToId) {
return readAsync(0, id, keyToId);
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
return get(readAsync(count, id, keyToId));
}
@Override
@ -340,10 +529,124 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
@Override
public Map<StreamMessageId, Map<K, V>> read(int count, long timeout, TimeUnit unit, StreamMessageId... ids) {
return get(readAsync(count, timeout, unit, ids));
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id,
String key2, StreamMessageId id2) {
return readAsync(count, timeout, unit, id, Collections.singletonMap(key2, id2));
}
@Override
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id,
String key2, StreamMessageId id2, String key3, StreamMessageId id3) {
Map<String, StreamMessageId> params = new HashMap<String, StreamMessageId>(2);
params.put(key2, id2);
params.put(key3, id3);
return readAsync(count, timeout, unit, id, params);
}
@Override
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
return readAsync(0, timeout, unit, id, keyToId);
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
return get(readAsync(count, timeout, unit, id, keyToId));
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
return read(0, timeout, unit, id, keyToId);
}
@Override
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(StreamMessageId id, String key2, StreamMessageId id2) {
return readAsync(id, Collections.singletonMap(key2, id2));
}
@Override
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(StreamMessageId id, String key2, StreamMessageId id2, String key3,
StreamMessageId id3) {
Map<String, StreamMessageId> params = new HashMap<String, StreamMessageId>(2);
params.put(key2, id2);
params.put(key3, id3);
return readAsync(id, params);
}
@Override
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, StreamMessageId id, String key2, StreamMessageId id2) {
return readAsync(count, id, Collections.singletonMap(key2, id2));
}
@Override
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, StreamMessageId id, String key2, StreamMessageId id2,
String key3, StreamMessageId id3) {
Map<String, StreamMessageId> params = new HashMap<String, StreamMessageId>(2);
params.put(key2, id2);
params.put(key3, id3);
return readAsync(count, id, params);
}
@Override
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id,
String key2, StreamMessageId id2) {
return readAsync(timeout, unit, id, Collections.singletonMap(key2, id2));
}
@Override
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id,
String key2, StreamMessageId id2, String key3, StreamMessageId id3) {
Map<String, StreamMessageId> params = new HashMap<String, StreamMessageId>(2);
params.put(key2, id2);
params.put(key3, id3);
return readAsync(timeout, unit, id, params);
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> read(StreamMessageId id, String key2, StreamMessageId id2) {
return get(readAsync(id, key2, id2));
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> read(StreamMessageId id, String key2, StreamMessageId id2, String key3,
StreamMessageId id3) {
return get(readAsync(id, key2, id2, key3, id3));
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId id, String key2, StreamMessageId id2) {
return get(readAsync(count, id, key2, id2));
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId id, String key2, StreamMessageId id2, String key3,
StreamMessageId id3) {
return get(readAsync(count, id, key2, id2, key3, id3));
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId id, String key2,
StreamMessageId id2) {
return get(readAsync(timeout, unit, id, key2, id2));
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId id, String key2,
StreamMessageId id2, String key3, StreamMessageId id3) {
return get(readAsync(timeout, unit, id, key2, id2, key3, id3));
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String key2,
StreamMessageId id2) {
return get(readAsync(count, timeout, unit, id, key2, id2));
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String key2,
StreamMessageId id2, String key3, StreamMessageId id3) {
return get(readAsync(count, timeout, unit, id, key2, id2, key3, id3));
}
@Override
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
List<Object> params = new ArrayList<Object>();
@ -448,6 +751,16 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
public RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(int count, StreamMessageId... ids) {
return readAsync(count, 0, null, ids);
}
@Override
public Map<StreamMessageId, Map<K, V>> read(int count, long timeout, TimeUnit unit, StreamMessageId... ids) {
return get(readAsync(count, timeout, unit, ids));
}
@Override
public Map<StreamMessageId, Map<K, V>> read(int count, StreamMessageId ... ids) {
return get(readAsync(count, ids));
}
@Override
public RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(int count, long timeout, TimeUnit unit,
@ -476,16 +789,6 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
return commandExecutor.readAsync(getName(), codec, RedisCommands.XREAD_SINGLE, params.toArray());
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
return get(readAsync(count, id, keyToId));
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
return get(readAsync(count, timeout, unit, id, keyToId));
}
@Override
public RFuture<Map<StreamMessageId, Map<K, V>>> rangeAsync(int count, StreamMessageId startId, StreamMessageId endId) {
List<Object> params = new LinkedList<Object>();
@ -536,16 +839,6 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
return readAsync(0, timeout, unit, ids);
}
@Override
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(StreamMessageId id, Map<String, StreamMessageId> keyToId) {
return readAsync(0, id, keyToId);
}
@Override
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
return readAsync(0, timeout, unit, id, keyToId);
}
@Override
public RFuture<Map<StreamMessageId, Map<K, V>>> rangeAsync(StreamMessageId startId, StreamMessageId endId) {
return rangeAsync(0, startId, endId);
@ -566,16 +859,6 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
return read(0, timeout, unit, ids);
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> read(StreamMessageId id, Map<String, StreamMessageId> keyToId) {
return read(0, id, keyToId);
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
return read(0, timeout, unit, id, keyToId);
}
@Override
public Map<StreamMessageId, Map<K, V>> range(StreamMessageId startId, StreamMessageId endId) {
return range(0, startId, endId);
@ -586,110 +869,6 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
return rangeReversed(0, startId, endId);
}
@Override
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(StreamMessageId id, String key2, StreamMessageId id2) {
return readAsync(id, Collections.singletonMap(key2, id2));
}
@Override
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(StreamMessageId id, String key2, StreamMessageId id2, String key3,
StreamMessageId id3) {
Map<String, StreamMessageId> params = new HashMap<String, StreamMessageId>(2);
params.put(key2, id2);
params.put(key3, id3);
return readAsync(id, params);
}
@Override
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, StreamMessageId id, String key2, StreamMessageId id2) {
return readAsync(count, id, Collections.singletonMap(key2, id2));
}
@Override
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, StreamMessageId id, String key2, StreamMessageId id2,
String key3, StreamMessageId id3) {
Map<String, StreamMessageId> params = new HashMap<String, StreamMessageId>(2);
params.put(key2, id2);
params.put(key3, id3);
return readAsync(count, id, params);
}
@Override
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id,
String key2, StreamMessageId id2) {
return readAsync(timeout, unit, id, Collections.singletonMap(key2, id2));
}
@Override
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id,
String key2, StreamMessageId id2, String key3, StreamMessageId id3) {
Map<String, StreamMessageId> params = new HashMap<String, StreamMessageId>(2);
params.put(key2, id2);
params.put(key3, id3);
return readAsync(timeout, unit, id, params);
}
@Override
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id,
String key2, StreamMessageId id2) {
return readAsync(count, timeout, unit, id, Collections.singletonMap(key2, id2));
}
@Override
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id,
String key2, StreamMessageId id2, String key3, StreamMessageId id3) {
Map<String, StreamMessageId> params = new HashMap<String, StreamMessageId>(2);
params.put(key2, id2);
params.put(key3, id3);
return readAsync(count, timeout, unit, id, params);
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> read(StreamMessageId id, String key2, StreamMessageId id2) {
return get(readAsync(id, key2, id2));
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> read(StreamMessageId id, String key2, StreamMessageId id2, String key3,
StreamMessageId id3) {
return get(readAsync(id, key2, id2, key3, id3));
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId id, String key2, StreamMessageId id2) {
return get(readAsync(count, id, key2, id2));
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId id, String key2, StreamMessageId id2, String key3,
StreamMessageId id3) {
return get(readAsync(count, id, key2, id2, key3, id3));
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId id, String key2,
StreamMessageId id2) {
return get(readAsync(timeout, unit, id, key2, id2));
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId id, String key2,
StreamMessageId id2, String key3, StreamMessageId id3) {
return get(readAsync(timeout, unit, id, key2, id2, key3, id3));
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String key2,
StreamMessageId id2) {
return get(readAsync(count, timeout, unit, id, key2, id2));
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String key2,
StreamMessageId id2, String key3, StreamMessageId id3) {
return get(readAsync(count, timeout, unit, id, key2, id2, key3, id3));
}
@Override
public RFuture<Long> removeAsync(StreamMessageId... ids) {
List<Object> params = new ArrayList<Object>();

@ -170,7 +170,7 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream Message IDs.
* Wait for stream data availability for specified <code>timeout</code> interval.
* Waits for stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
@ -183,7 +183,7 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream Message IDs.
* Wait for stream data availability for specified <code>timeout</code> interval.
* Waits for stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
@ -194,6 +194,184 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* @return stream data mapped by Stream Message ID
*/
Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param id - starting message id for this stream
* @param nameToId - Stream Message ID mapped by stream name
* @return stream data mapped by key and Stream Message ID
*/
Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param id - starting message id for this stream
* @param nameToId - Stream Message ID mapped by stream name
* @return stream data mapped by key and Stream Message ID
*/
Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
* Waits for the first stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - starting message id for this stream
* @param nameToId - Stream Message ID mapped by stream name
* @return stream data mapped by key and Stream Message ID
*/
Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
* Waits for the first stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - starting message id for this stream
* @param nameToId - Stream Message ID mapped by stream name
* @return stream data mapped by key and Stream Message ID
*/
Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @return stream data mapped by key and Stream Message ID
*/
Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, StreamMessageId id, String key2, StreamMessageId id2);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @param key3 - name of third stream
* @param id3 - starting message id for third stream
* @return stream data mapped by key and Stream Message ID
*/
Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, StreamMessageId id, String key2, StreamMessageId id2, String key3,
StreamMessageId id3);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @return stream data mapped by key and Stream Message ID
*/
Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, StreamMessageId id, String key2, StreamMessageId id2);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @param key3 - name of third stream
* @param id3 - starting message id for third stream
* @return stream data mapped by key and Stream Message ID
*/
Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, StreamMessageId id, String key2, StreamMessageId id2, String key3,
StreamMessageId id3);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
* Waits for the first stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @return stream data mapped by key and Stream Message ID
*/
Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id, String key2,
StreamMessageId id2);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
* Waits for the first stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @param key3 - name of third stream
* @param id3 - starting message id for third stream
* @return stream data mapped by key and Stream Message ID
*/
Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id, String key2,
StreamMessageId id2, String key3, StreamMessageId id3);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
* Waits for the first stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @return stream data mapped by key and Stream Message ID
*/
Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id, String key2,
StreamMessageId id2);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
* Waits for the first stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @param key3 - name of third stream
* @param id3 - starting message id for third stream
* @return stream data mapped by key and Stream Message ID
*/
Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id, String key2,
StreamMessageId id2, String key3, StreamMessageId id3);
/**
* Returns number of entries in stream

@ -199,6 +199,168 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
*/
RFuture<Map<StreamMessageId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param id - starting message id for this stream
* @param nameToId - Stream Message ID mapped by stream name
* @return stream data mapped by key and Stream Message ID
*/
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName,StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param id - starting message id for this stream
* @param nameToId - Stream Message ID mapped by stream name
* @return stream data mapped by key and Stream Message ID
*/
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, int count, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
* Waits for the first stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @return stream data mapped by key and Stream Message ID
*/
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id, String key2, StreamMessageId id2);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
* Waits for the first stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @param key3 - name of third stream
* @param id3 - starting message id for third stream
* @return stream data mapped by key and Stream Message ID
*/
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id,
String key2, StreamMessageId id2, String key3, StreamMessageId id3);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
* Waits for the first stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - starting message id for this stream
* @param nameToId - Stream Message ID mapped by stream name
* @return stream data mapped by key and Stream Message ID
*/
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @return stream data mapped by key and Stream Message ID
*/
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, StreamMessageId id, String key2, StreamMessageId id2);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @param key3 - name of third stream
* @param id3 - starting message id for third stream
* @return stream data mapped by key and Stream Message ID
*/
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, StreamMessageId id, String key2, StreamMessageId id2, String key3,
StreamMessageId id3);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @return stream data mapped by key and Stream Message ID
*/
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, int count, StreamMessageId id, String key2, StreamMessageId id2);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @param key3 - name of third stream
* @param id3 - starting message id for third stream
* @return stream data mapped by key and Stream Message ID
*/
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, int count, StreamMessageId id, String key2, StreamMessageId id2,
String key3, StreamMessageId id3);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
* Waits for the first stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @return stream data mapped by key and Stream Message ID
*/
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id,
String key2, StreamMessageId id2);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
* Waits for the first stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @param key3 - name of third stream
* @param id3 - starting message id for third stream
* @return stream data mapped by key and Stream Message ID
*/
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id,
String key2, StreamMessageId id2, String key3, StreamMessageId id3);
/**
* Returns number of entries in stream
*

@ -77,6 +77,7 @@ public interface RTopicRx {
/**
* Returns stream of messages.
*
* @param <M> - type of message
* @param type - type of message to listen
* @return stream of messages
*/

@ -333,8 +333,8 @@ public interface RedisCommands {
new ObjectMapReplayDecoder(ListMultiDecoder.RESET),
new ObjectMapJoinDecoder()));
RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> XREVRANGE = new RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>>("XREVRANGE",
XRANGE.getReplayMultiDecoder());
RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> XREVRANGE =
new RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>>("XREVRANGE", XRANGE.getReplayMultiDecoder());
RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> XREAD = new RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>>("XREAD",
new ListMultiDecoder(
@ -358,7 +358,8 @@ public interface RedisCommands {
new ObjectMapReplayDecoder(),
new StreamResultDecoder()));
RedisCommand<Map<StreamMessageId, Map<Object, Object>>> XREAD_BLOCKING_SINGLE = new RedisCommand<Map<StreamMessageId, Map<Object, Object>>>("XREAD", XREAD_SINGLE.getReplayMultiDecoder());
RedisCommand<Map<StreamMessageId, Map<Object, Object>>> XREAD_BLOCKING_SINGLE =
new RedisCommand<Map<StreamMessageId, Map<Object, Object>>>("XREAD", XREAD_SINGLE.getReplayMultiDecoder());
RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> XREADGROUP = new RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>>("XREADGROUP",
new ListMultiDecoder(
@ -370,15 +371,8 @@ public interface RedisCommands {
new ObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX),
new StreamResultDecoder()));
RedisCommand<Map<StreamMessageId, Map<Object, Object>>> XREADGROUP_BLOCKING = new RedisCommand<Map<StreamMessageId, Map<Object, Object>>>("XREADGROUP",
new ListMultiDecoder(
new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()),
new ObjectDecoder(new StreamIdDecoder()),
new ObjectMapReplayDecoder(),
new ObjectMapReplayDecoder(ListMultiDecoder.RESET_1),
new ObjectMapJoinDecoder(),
new ObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX),
new StreamResultDecoder()));
RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> XREADGROUP_BLOCKING =
new RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>>("XREADGROUP", XREADGROUP.getReplayMultiDecoder());
RedisCommand<Map<StreamMessageId, Map<Object, Object>>> XREADGROUP_SINGLE = new RedisCommand<Map<StreamMessageId, Map<Object, Object>>>("XREADGROUP",
new ListMultiDecoder(

@ -2,6 +2,7 @@ package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@ -199,6 +200,30 @@ public class RedissonStreamTest extends BaseTest {
assertThat(stream.ack("testGroup", id1, id2)).isEqualTo(2);
}
@Test
public void testReadGroupMulti() {
RStream<String, String> stream1 = redisson.getStream("test1");
RStream<String, String> stream2 = redisson.getStream("test2");
StreamMessageId id01 = stream1.add("0", "0");
StreamMessageId id02 = stream2.add("0", "0");
stream1.createGroup("testGroup", id01);
stream2.createGroup("testGroup", id02);
StreamMessageId id11 = stream1.add("1", "1");
StreamMessageId id12 = stream1.add("2", "2");
StreamMessageId id13 = stream1.add("3", "3");
StreamMessageId id21 = stream2.add("1", "1");
StreamMessageId id22 = stream2.add("2", "2");
StreamMessageId id23 = stream2.add("3", "3");
Map<String, Map<StreamMessageId, Map<String, String>>> s2 = stream1.readGroup("testGroup", "consumer1", id11, Collections.singletonMap("test2", id21));
assertThat(s2.keySet()).containsExactly("test1", "test2");
assertThat(s2.get("test1").keySet()).containsExactly(id12, id13);
assertThat(s2.get("test2").keySet()).containsExactly(id22, id23);
}
@Test
public void testReadGroup() {
RStream<String, String> stream = redisson.getStream("test");

Loading…
Cancel
Save