Merge branch 'master' into 3.0.0

# Conflicts:
#	redisson/src/main/java/org/redisson/api/RStreamReactive.java
pull/1821/head
Nikita Koksharov 6 years ago
commit 03aa5d3676

@ -27,7 +27,7 @@ import org.redisson.api.PendingEntry;
import org.redisson.api.PendingResult;
import org.redisson.api.RFuture;
import org.redisson.api.RStream;
import org.redisson.api.StreamId;
import org.redisson.api.StreamMessageId;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
@ -69,25 +69,25 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
@Override
public RFuture<Void> createGroupAsync(String groupName) {
return createGroupAsync(groupName, StreamId.NEWEST);
return createGroupAsync(groupName, StreamMessageId.NEWEST);
}
@Override
public void createGroup(String groupName, StreamId id) {
public void createGroup(String groupName, StreamMessageId id) {
get(createGroupAsync(groupName, id));
}
@Override
public RFuture<Void> createGroupAsync(String groupName, StreamId id) {
public RFuture<Void> createGroupAsync(String groupName, StreamMessageId id) {
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XGROUP, "CREATE", getName(), groupName, id);
}
@Override
public RFuture<Long> ackAsync(String groupName, StreamId... ids) {
public RFuture<Long> ackAsync(String groupName, StreamMessageId... ids) {
List<Object> params = new ArrayList<Object>();
params.add(getName());
params.add(groupName);
for (StreamId id : ids) {
for (StreamMessageId id : ids) {
params.add(id);
}
@ -95,7 +95,7 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
@Override
public Long ack(String groupName, StreamId... id) {
public long ack(String groupName, StreamMessageId... id) {
return get(ackAsync(groupName, id));
}
@ -110,37 +110,59 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
@Override
public RFuture<List<PendingEntry>> listPendingAsync(String groupName, StreamId startId, StreamId endId, int count,
String consumerName) {
public RFuture<List<PendingEntry>> listPendingAsync(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, int count) {
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.XPENDING_ENTRIES, getName(), groupName, startId, endId, count, consumerName);
}
@Override
public RFuture<List<PendingEntry>> listPendingAsync(String groupName, StreamId startId, StreamId endId, int count) {
public RFuture<List<PendingEntry>> listPendingAsync(String groupName, StreamMessageId startId, StreamMessageId endId, int count) {
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.XPENDING_ENTRIES, getName(), groupName, startId, endId, count);
}
@Override
public List<PendingEntry> listPending(String groupName, StreamId startId, StreamId endId, int count) {
public List<PendingEntry> listPending(String groupName, StreamMessageId startId, StreamMessageId endId, int count) {
return get(listPendingAsync(groupName, startId, endId, count));
}
@Override
public List<PendingEntry> listPending(String groupName, StreamId startId, StreamId endId, int count,
String consumerName) {
return get(listPendingAsync(groupName, startId, endId, count, consumerName));
public List<PendingEntry> listPending(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, int count) {
return get(listPendingAsync(groupName, consumerName, startId, endId, count));
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> claimAsync(String groupName, String consumerName, long idleTime,
TimeUnit idleTimeUnit, StreamId... ids) {
public List<StreamMessageId> fastClaim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit,
StreamMessageId... ids) {
return get(fastClaimAsync(groupName, consumerName, idleTime, idleTimeUnit, ids));
}
@Override
public RFuture<List<StreamMessageId>> fastClaimAsync(String groupName, String consumerName, long idleTime,
TimeUnit idleTimeUnit, StreamMessageId... ids) {
List<Object> params = new ArrayList<Object>();
params.add(getName());
params.add(groupName);
params.add(consumerName);
params.add(idleTimeUnit.toMillis(idleTime));
for (StreamId id : ids) {
for (StreamMessageId id : ids) {
params.add(id.toString());
}
params.add("JUSTID");
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.XCLAIM_IDS, params.toArray());
}
@Override
public RFuture<Map<StreamMessageId, Map<K, V>>> claimAsync(String groupName, String consumerName, long idleTime,
TimeUnit idleTimeUnit, StreamMessageId... ids) {
List<Object> params = new ArrayList<Object>();
params.add(getName());
params.add(groupName);
params.add(consumerName);
params.add(idleTimeUnit.toMillis(idleTime));
for (StreamMessageId id : ids) {
params.add(id.toString());
}
@ -148,30 +170,30 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
@Override
public Map<StreamId, Map<K, V>> claimPending(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit,
StreamId... ids) {
public Map<StreamMessageId, Map<K, V>> claim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit,
StreamMessageId... ids) {
return get(claimAsync(groupName, consumerName, idleTime, idleTimeUnit, ids));
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, StreamId... ids) {
public RFuture<Map<StreamMessageId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, StreamMessageId... ids) {
return readGroupAsync(groupName, consumerName, 0, ids);
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, int count, StreamId... ids) {
public RFuture<Map<StreamMessageId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, int count, StreamMessageId... ids) {
return readGroupAsync(groupName, consumerName, count, 0, null, ids);
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit,
StreamId... ids) {
public RFuture<Map<StreamMessageId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit,
StreamMessageId... ids) {
return readGroupAsync(groupName, consumerName, 0, timeout, unit, ids);
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit,
StreamId... ids) {
public RFuture<Map<StreamMessageId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit,
StreamMessageId... ids) {
List<Object> params = new ArrayList<Object>();
params.add("GROUP");
params.add(groupName);
@ -194,7 +216,7 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
params.add(">");
}
for (StreamId id : ids) {
for (StreamMessageId id : ids) {
params.add(id.toString());
}
@ -205,62 +227,62 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
@Override
public Map<StreamId, Map<K, V>> readGroup(String groupName, String consumerName, StreamId... ids) {
public Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, StreamMessageId... ids) {
return get(readGroupAsync(groupName, consumerName, ids));
}
@Override
public Map<StreamId, Map<K, V>> readGroup(String groupName, String consumerName, int count, StreamId... ids) {
public Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, int count, StreamMessageId... ids) {
return get(readGroupAsync(groupName, consumerName, count, ids));
}
@Override
public Map<StreamId, Map<K, V>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamId... ids) {
public Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId... ids) {
return get(readGroupAsync(groupName, consumerName, timeout, unit, ids));
}
@Override
public Map<StreamId, Map<K, V>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit,
StreamId... ids) {
public Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit,
StreamMessageId... ids) {
return get(readGroupAsync(groupName, consumerName, count, timeout, unit, ids));
}
@Override
public StreamId addAll(Map<K, V> entries) {
public StreamMessageId addAll(Map<K, V> entries) {
return addAll(entries, 0, false);
}
@Override
public RFuture<StreamId> addAllAsync(Map<K, V> entries) {
public RFuture<StreamMessageId> addAllAsync(Map<K, V> entries) {
return addAllAsync(entries, 0, false);
}
@Override
public void addAll(StreamId id, Map<K, V> entries) {
public void addAll(StreamMessageId id, Map<K, V> entries) {
addAll(id, entries, 0, false);
}
@Override
public RFuture<Void> addAllAsync(StreamId id, Map<K, V> entries) {
public RFuture<Void> addAllAsync(StreamMessageId id, Map<K, V> entries) {
return addAllAsync(id, entries, 0, false);
}
@Override
public StreamId addAll(Map<K, V> entries, int trimLen, boolean trimStrict) {
public StreamMessageId addAll(Map<K, V> entries, int trimLen, boolean trimStrict) {
return get(addAllAsync(entries, trimLen, trimStrict));
}
@Override
public RFuture<StreamId> addAllAsync(Map<K, V> entries, int trimLen, boolean trimStrict) {
public RFuture<StreamMessageId> addAllAsync(Map<K, V> entries, int trimLen, boolean trimStrict) {
return addAllCustomAsync(null, entries, trimLen, trimStrict);
}
@Override
public void addAll(StreamId id, Map<K, V> entries, int trimLen, boolean trimStrict) {
public void addAll(StreamMessageId id, Map<K, V> entries, int trimLen, boolean trimStrict) {
get(addAllAsync(id, entries, trimLen, trimStrict));
}
private <R> RFuture<R> addAllCustomAsync(StreamId id, Map<K, V> entries, int trimLen, boolean trimStrict) {
private <R> RFuture<R> addAllCustomAsync(StreamMessageId id, Map<K, V> entries, int trimLen, boolean trimStrict) {
List<Object> params = new ArrayList<Object>(entries.size()*2 + 1);
params.add(getName());
@ -293,7 +315,7 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
@Override
public RFuture<Void> addAllAsync(StreamId id, Map<K, V> entries, int trimLen, boolean trimStrict) {
public RFuture<Void> addAllAsync(StreamMessageId id, Map<K, V> entries, int trimLen, boolean trimStrict) {
return addAllCustomAsync(id, entries, trimLen, trimStrict);
}
@ -308,22 +330,22 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
@Override
public Map<StreamId, Map<K, V>> read(int count, StreamId ... ids) {
public Map<StreamMessageId, Map<K, V>> read(int count, StreamMessageId ... ids) {
return get(readAsync(count, ids));
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, StreamId id, Map<String, StreamId> keyToId) {
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
return readAsync(count, -1, null, id, keyToId);
}
@Override
public Map<StreamId, Map<K, V>> read(int count, long timeout, TimeUnit unit, StreamId... ids) {
public Map<StreamMessageId, Map<K, V>> read(int count, long timeout, TimeUnit unit, StreamMessageId... ids) {
return get(readAsync(count, timeout, unit, ids));
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> keyToId) {
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
List<Object> params = new ArrayList<Object>();
if (count > 0) {
params.add("COUNT");
@ -342,7 +364,7 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
params.add(id);
for (StreamId nextId : keyToId.values()) {
for (StreamMessageId nextId : keyToId.values()) {
params.add(nextId.toString());
}
@ -353,21 +375,21 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
@Override
public RFuture<StreamId> addAsync(K key, V value) {
public RFuture<StreamMessageId> addAsync(K key, V value) {
return addAsync(key, value, 0, false);
}
@Override
public RFuture<Void> addAsync(StreamId id, K key, V value) {
public RFuture<Void> addAsync(StreamMessageId id, K key, V value) {
return addAsync(id, key, value, 0, false);
}
@Override
public RFuture<StreamId> addAsync(K key, V value, int trimLen, boolean trimStrict) {
public RFuture<StreamMessageId> addAsync(K key, V value, int trimLen, boolean trimStrict) {
return addCustomAsync(null, key, value, trimLen, trimStrict);
}
private <R> RFuture<R> addCustomAsync(StreamId id, K key, V value, int trimLen, boolean trimStrict) {
private <R> RFuture<R> addCustomAsync(StreamMessageId id, K key, V value, int trimLen, boolean trimStrict) {
List<Object> params = new LinkedList<Object>();
params.add(getName());
@ -398,38 +420,38 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
@Override
public RFuture<Void> addAsync(StreamId id, K key, V value, int trimLen, boolean trimStrict) {
public RFuture<Void> addAsync(StreamMessageId id, K key, V value, int trimLen, boolean trimStrict) {
return addCustomAsync(id, key, value, trimLen, trimStrict);
}
@Override
public StreamId add(K key, V value) {
public StreamMessageId add(K key, V value) {
return get(addAsync(key, value));
}
@Override
public void add(StreamId id, K key, V value) {
public void add(StreamMessageId id, K key, V value) {
get(addAsync(id, key, value));
}
@Override
public StreamId add(K key, V value, int trimLen, boolean trimStrict) {
public StreamMessageId add(K key, V value, int trimLen, boolean trimStrict) {
return get(addAsync(key, value, trimLen, trimStrict));
}
@Override
public void add(StreamId id, K key, V value, int trimLen, boolean trimStrict) {
public void add(StreamMessageId id, K key, V value, int trimLen, boolean trimStrict) {
get(addAsync(id, key, value, trimLen, trimStrict));
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> readAsync(int count, StreamId... ids) {
public RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(int count, StreamMessageId... ids) {
return readAsync(count, 0, null, ids);
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> readAsync(int count, long timeout, TimeUnit unit,
StreamId... ids) {
public RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(int count, long timeout, TimeUnit unit,
StreamMessageId... ids) {
List<Object> params = new ArrayList<Object>();
if (count > 0) {
params.add("COUNT");
@ -444,7 +466,7 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
params.add("STREAMS");
params.add(getName());
for (StreamId id : ids) {
for (StreamMessageId id : ids) {
params.add(id.toString());
}
@ -455,17 +477,17 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(int count, StreamId id, Map<String, StreamId> keyToId) {
public Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
return get(readAsync(count, id, keyToId));
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> keyToId) {
public Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
return get(readAsync(count, timeout, unit, id, keyToId));
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> rangeAsync(int count, StreamId startId, StreamId endId) {
public RFuture<Map<StreamMessageId, Map<K, V>>> rangeAsync(int count, StreamMessageId startId, StreamMessageId endId) {
List<Object> params = new LinkedList<Object>();
params.add(getName());
params.add(startId);
@ -480,12 +502,12 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
@Override
public Map<StreamId, Map<K, V>> range(int count, StreamId startId, StreamId endId) {
public Map<StreamMessageId, Map<K, V>> range(int count, StreamMessageId startId, StreamMessageId endId) {
return get(rangeAsync(count, startId, endId));
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> rangeReversedAsync(int count, StreamId startId, StreamId endId) {
public RFuture<Map<StreamMessageId, Map<K, V>>> rangeReversedAsync(int count, StreamMessageId startId, StreamMessageId endId) {
List<Object> params = new LinkedList<Object>();
params.add(getName());
params.add(startId);
@ -500,172 +522,238 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
@Override
public Map<StreamId, Map<K, V>> rangeReversed(int count, StreamId startId, StreamId endId) {
public Map<StreamMessageId, Map<K, V>> rangeReversed(int count, StreamMessageId startId, StreamMessageId endId) {
return get(rangeReversedAsync(count, startId, endId));
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> readAsync(StreamId... ids) {
public RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(StreamMessageId... ids) {
return readAsync(0, ids);
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> readAsync(long timeout, TimeUnit unit, StreamId... ids) {
public RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(long timeout, TimeUnit unit, StreamMessageId... ids) {
return readAsync(0, timeout, unit, ids);
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(StreamId id, Map<String, StreamId> keyToId) {
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(StreamMessageId id, Map<String, StreamMessageId> keyToId) {
return readAsync(0, id, keyToId);
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> keyToId) {
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
return readAsync(0, timeout, unit, id, keyToId);
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> rangeAsync(StreamId startId, StreamId endId) {
public RFuture<Map<StreamMessageId, Map<K, V>>> rangeAsync(StreamMessageId startId, StreamMessageId endId) {
return rangeAsync(0, startId, endId);
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> rangeReversedAsync(StreamId startId, StreamId endId) {
public RFuture<Map<StreamMessageId, Map<K, V>>> rangeReversedAsync(StreamMessageId startId, StreamMessageId endId) {
return rangeReversedAsync(0, startId, endId);
}
@Override
public Map<StreamId, Map<K, V>> read(StreamId... ids) {
public Map<StreamMessageId, Map<K, V>> read(StreamMessageId... ids) {
return read(0, ids);
}
@Override
public Map<StreamId, Map<K, V>> read(long timeout, TimeUnit unit, StreamId... ids) {
public Map<StreamMessageId, Map<K, V>> read(long timeout, TimeUnit unit, StreamMessageId... ids) {
return read(0, timeout, unit, ids);
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(StreamId id, Map<String, StreamId> keyToId) {
public Map<String, Map<StreamMessageId, Map<K, V>>> read(StreamMessageId id, Map<String, StreamMessageId> keyToId) {
return read(0, id, keyToId);
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> keyToId) {
public Map<String, Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
return read(0, timeout, unit, id, keyToId);
}
@Override
public Map<StreamId, Map<K, V>> range(StreamId startId, StreamId endId) {
public Map<StreamMessageId, Map<K, V>> range(StreamMessageId startId, StreamMessageId endId) {
return range(0, startId, endId);
}
@Override
public Map<StreamId, Map<K, V>> rangeReversed(StreamId startId, StreamId endId) {
public Map<StreamMessageId, Map<K, V>> rangeReversed(StreamMessageId startId, StreamMessageId endId) {
return rangeReversed(0, startId, endId);
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(StreamId id, String key2, StreamId id2) {
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(StreamMessageId id, String key2, StreamMessageId id2) {
return readAsync(id, Collections.singletonMap(key2, id2));
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(StreamId id, String key2, StreamId id2, String key3,
StreamId id3) {
Map<String, StreamId> params = new HashMap<String, StreamId>(2);
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(StreamMessageId id, String key2, StreamMessageId id2, String key3,
StreamMessageId id3) {
Map<String, StreamMessageId> params = new HashMap<String, StreamMessageId>(2);
params.put(key2, id2);
params.put(key3, id3);
return readAsync(id, params);
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, StreamId id, String key2, StreamId id2) {
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, StreamMessageId id, String key2, StreamMessageId id2) {
return readAsync(count, id, Collections.singletonMap(key2, id2));
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, StreamId id, String key2, StreamId id2,
String key3, StreamId id3) {
Map<String, StreamId> params = new HashMap<String, StreamId>(2);
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, StreamMessageId id, String key2, StreamMessageId id2,
String key3, StreamMessageId id3) {
Map<String, StreamMessageId> params = new HashMap<String, StreamMessageId>(2);
params.put(key2, id2);
params.put(key3, id3);
return readAsync(count, id, params);
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamId id,
String key2, StreamId id2) {
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id,
String key2, StreamMessageId id2) {
return readAsync(timeout, unit, id, Collections.singletonMap(key2, id2));
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamId id,
String key2, StreamId id2, String key3, StreamId id3) {
Map<String, StreamId> params = new HashMap<String, StreamId>(2);
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id,
String key2, StreamMessageId id2, String key3, StreamMessageId id3) {
Map<String, StreamMessageId> params = new HashMap<String, StreamMessageId>(2);
params.put(key2, id2);
params.put(key3, id3);
return readAsync(timeout, unit, id, params);
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamId id,
String key2, StreamId id2) {
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id,
String key2, StreamMessageId id2) {
return readAsync(count, timeout, unit, id, Collections.singletonMap(key2, id2));
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamId id,
String key2, StreamId id2, String key3, StreamId id3) {
Map<String, StreamId> params = new HashMap<String, StreamId>(2);
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id,
String key2, StreamMessageId id2, String key3, StreamMessageId id3) {
Map<String, StreamMessageId> params = new HashMap<String, StreamMessageId>(2);
params.put(key2, id2);
params.put(key3, id3);
return readAsync(count, timeout, unit, id, params);
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(StreamId id, String key2, StreamId id2) {
public Map<String, Map<StreamMessageId, Map<K, V>>> read(StreamMessageId id, String key2, StreamMessageId id2) {
return get(readAsync(id, key2, id2));
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(StreamId id, String key2, StreamId id2, String key3,
StreamId id3) {
public Map<String, Map<StreamMessageId, Map<K, V>>> read(StreamMessageId id, String key2, StreamMessageId id2, String key3,
StreamMessageId id3) {
return get(readAsync(id, key2, id2, key3, id3));
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(int count, StreamId id, String key2, StreamId id2) {
public Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId id, String key2, StreamMessageId id2) {
return get(readAsync(count, id, key2, id2));
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(int count, StreamId id, String key2, StreamId id2, String key3,
StreamId id3) {
public Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId id, String key2, StreamMessageId id2, String key3,
StreamMessageId id3) {
return get(readAsync(count, id, key2, id2, key3, id3));
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamId id, String key2,
StreamId id2) {
public Map<String, Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId id, String key2,
StreamMessageId id2) {
return get(readAsync(timeout, unit, id, key2, id2));
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamId id, String key2,
StreamId id2, String key3, StreamId id3) {
public Map<String, Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId id, String key2,
StreamMessageId id2, String key3, StreamMessageId id3) {
return get(readAsync(timeout, unit, id, key2, id2, key3, id3));
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamId id, String key2,
StreamId id2) {
public Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String key2,
StreamMessageId id2) {
return get(readAsync(count, timeout, unit, id, key2, id2));
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamId id, String key2,
StreamId id2, String key3, StreamId id3) {
public Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String key2,
StreamMessageId id2, String key3, StreamMessageId id3) {
return get(readAsync(count, timeout, unit, id, key2, id2, key3, id3));
}
@Override
public RFuture<Long> removeAsync(StreamMessageId... ids) {
List<Object> params = new ArrayList<Object>();
params.add(getName());
for (StreamMessageId id : ids) {
params.add(id);
}
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XDEL, params.toArray());
}
@Override
public long remove(StreamMessageId... ids) {
return get(removeAsync(ids));
}
@Override
public RFuture<Long> trimAsync(int count) {
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XTRIM, "MAXLEN", count);
}
@Override
public RFuture<Long> trimNonStrictAsync(int count) {
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XTRIM, "MAXLEN", "~", count);
}
@Override
public long trim(int count) {
return get(trimAsync(count));
}
@Override
public long trimNonStrict(int count) {
return get(trimNonStrictAsync(count));
}
@Override
public RFuture<Void> removeGroupAsync(String groupName) {
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XGROUP, "DESTROY", getName(), groupName);
}
@Override
public void removeGroup(String groupName) {
get(removeGroupAsync(groupName));
}
@Override
public RFuture<Long> removeConsumerAsync(String groupName, String consumerName) {
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XGROUP_LONG, "DELCONSUMER", getName(), groupName, consumerName);
}
@Override
public long removeConsumer(String groupName, String consumerName) {
return get(removeConsumerAsync(groupName, consumerName));
}
@Override
public RFuture<Void> updateGroupMessageIdAsync(String groupName, StreamMessageId id) {
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XGROUP, "SETID", getName(), groupName, id);
}
@Override
public void updateGroupMessageId(String groupName, StreamMessageId id) {
get(updateGroupMessageIdAsync(groupName, id));
}
}

@ -23,12 +23,12 @@ package org.redisson.api;
*/
public class PendingEntry {
private StreamId id;
private StreamMessageId id;
private String consumerName;
private long idleTime;
private long lastTimeDelivered;
public PendingEntry(StreamId id, String consumerName, long idleTime, long lastTimeDelivered) {
public PendingEntry(StreamMessageId id, String consumerName, long idleTime, long lastTimeDelivered) {
super();
this.id = id;
this.consumerName = consumerName;
@ -41,7 +41,7 @@ public class PendingEntry {
*
* @return id
*/
public StreamId getId() {
public StreamMessageId getId() {
return id;
}

@ -29,14 +29,14 @@ public class PendingResult implements Serializable {
private static final long serialVersionUID = -5525031552305408248L;
private long total;
private StreamId lowestId;
private StreamId highestId;
private StreamMessageId lowestId;
private StreamMessageId highestId;
private Map<String, Long> consumerNames;
public PendingResult() {
}
public PendingResult(long total, StreamId lowestId, StreamId highestId, Map<String, Long> consumerNames) {
public PendingResult(long total, StreamMessageId lowestId, StreamMessageId highestId, Map<String, Long> consumerNames) {
super();
this.total = total;
this.lowestId = lowestId;
@ -58,7 +58,7 @@ public class PendingResult implements Serializable {
*
* @return number
*/
public StreamId getLowestId() {
public StreamMessageId getLowestId() {
return lowestId;
}
@ -67,7 +67,7 @@ public class PendingResult implements Serializable {
*
* @return number
*/
public StreamId getHighestId() {
public StreamMessageId getHighestId() {
return highestId;
}

@ -39,24 +39,48 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
void createGroup(String groupName);
/**
* Creates consumer group by name and stream id.
* Creates consumer group by name and Stream Message ID.
* Only new messages after defined stream <code>id</code> will be available for consumers of this group.
* <p>
* {@link StreamId#NEWEST} is used for messages arrived since the moment of group creating
* {@link StreamMessageId#NEWEST} is used for messages arrived since the moment of group creating
*
* @param groupName - name of group
* @param id - stream id
* @param id - Stream Message ID
*/
void createGroup(String groupName, StreamId id);
void createGroup(String groupName, StreamMessageId id);
/**
* Removes group by name.
*
* @param groupName - name of group
*/
void removeGroup(String groupName);
/**
* Removes consumer of the group by name.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @return number of pending messages owned by consumer
*/
long removeConsumer(String groupName, String consumerName);
/**
* Updates next message id delivered to consumers.
*
* @param groupName - name of group
* @param id - Stream Message ID
*/
void updateGroupMessageId(String groupName, StreamMessageId id);
/**
* Marks pending messages by group name and stream <code>ids</code> as correctly processed.
*
* @param groupName - name of group
* @param ids - stream ids
* @param ids - Stream Message IDs
* @return marked messages amount
*/
Long ack(String groupName, StreamId... ids);
long ack(String groupName, StreamMessageId... ids);
/**
* Returns pending messages by group name
@ -68,34 +92,47 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
/**
* Returns list of pending messages by group name.
* Limited by start stream id and end stream id and count.
* Limited by start Stream Message ID and end Stream Message ID and count.
* <p>
* {@link StreamId#MAX} is used as max stream id
* {@link StreamId#MIN} is used as min stream id
* {@link StreamMessageId#MAX} is used as max Stream Message ID
* {@link StreamMessageId#MIN} is used as min Stream Message ID
*
* @param groupName - name of group
* @param startId - start stream id
* @param endId - end stream id
* @param startId - start Stream Message ID
* @param endId - end Stream Message ID
* @param count - amount of messages
* @return list
*/
List<PendingEntry> listPending(String groupName, StreamId startId, StreamId endId, int count);
List<PendingEntry> listPending(String groupName, StreamMessageId startId, StreamMessageId endId, int count);
/**
* Returns list of pending messages by group name and consumer name.
* Limited by start stream id and end stream id and count.
* Limited by start Stream Message ID and end Stream Message ID and count.
* <p>
* {@link StreamId#MAX} is used as max stream id
* {@link StreamId#MIN} is used as min stream id
* {@link StreamMessageId#MAX} is used as max Stream Message ID
* {@link StreamMessageId#MIN} is used as min Stream Message ID
*
* @param consumerName - name of consumer
* @param groupName - name of group
* @param startId - start stream id
* @param endId - end stream id
* @param startId - start Stream Message ID
* @param endId - end Stream Message ID
* @param count - amount of messages
* @return list
*/
List<PendingEntry> listPending(String groupName, StreamId startId, StreamId endId, int count, String consumerName);
List<PendingEntry> listPending(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, int count);
/**
* Transfers ownership of pending messages by id to a new consumer
* by name if idle time of messages is greater than defined value.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param idleTime - minimum idle time of messages
* @param idleTimeUnit - idle time unit
* @param ids - Stream Message IDs
* @return stream data mapped by Stream Message ID
*/
Map<StreamMessageId, Map<K, V>> claim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId ... ids);
/**
* Transfers ownership of pending messages by id to a new consumer
@ -105,47 +142,47 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* @param consumerName - name of consumer
* @param idleTime - minimum idle time of messages
* @param idleTimeUnit - idle time unit
* @param ids - stream ids
* @return stream data mapped by Stream ID
* @param ids - Stream Message IDs
* @return list of Stream Message IDs
*/
Map<StreamId, Map<K, V>> claimPending(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamId ... ids);
List<StreamMessageId> fastClaim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream Message IDs.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
* @param ids - collection of Stream Message IDs
* @return stream data mapped by Stream Message ID
*/
Map<StreamId, Map<K, V>> readGroup(String groupName, String consumerName, StreamId ... ids);
Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, StreamMessageId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream Message IDs.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
* @param ids - collection of Stream Message IDs
* @return stream data mapped by Stream Message ID
*/
Map<StreamId, Map<K, V>> readGroup(String groupName, String consumerName, int count, StreamId ... ids);
Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, int count, StreamMessageId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream Message IDs.
* Wait for stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
* @param ids - collection of Stream Message IDs
* @return stream data mapped by Stream Message ID
*/
Map<StreamId, Map<K, V>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamId ... ids);
Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream Message IDs.
* Wait for stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
@ -153,10 +190,10 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
* @param ids - collection of Stream Message IDs
* @return stream data mapped by Stream Message ID
*/
Map<StreamId, Map<K, V>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamId ... ids);
Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId ... ids);
/**
* Returns number of entries in stream
@ -166,25 +203,25 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
long size();
/**
* Appends a new entry and returns generated Stream ID
* Appends a new entry and returns generated Stream Message ID
*
* @param key - key of entry
* @param value - value of entry
* @return Stream ID
* @return Stream Message ID
*/
StreamId add(K key, V value);
StreamMessageId add(K key, V value);
/**
* Appends a new entry by specified Stream ID
* Appends a new entry by specified Stream Message ID
*
* @param id - Stream ID
* @param id - Stream Message ID
* @param key - key of entry
* @param value - value of entry
*/
void add(StreamId id, K key, V value);
void add(StreamMessageId id, K key, V value);
/**
* Appends a new entry and returns generated Stream ID.
* Appends a new entry and returns generated Stream Message ID.
* Trims stream to a specified <code>trimLen</code> size.
* If <code>trimStrict</code> is <code>false</code> then trims to few tens of entries more than specified length to trim.
*
@ -192,102 +229,102 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* @param value - value of entry
* @param trimLen - length to trim
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return Stream ID
* @return Stream Message ID
*/
StreamId add(K key, V value, int trimLen, boolean trimStrict);
StreamMessageId add(K key, V value, int trimLen, boolean trimStrict);
/**
* Appends a new entry by specified Stream ID.
* Appends a new entry by specified Stream Message ID.
* Trims stream to a specified <code>trimLen</code> size.
* If <code>trimStrict</code> is <code>false</code> then trims to few tens of entries more than specified length to trim.
*
* @param id - Stream ID
* @param id - Stream Message ID
* @param key - key of entry
* @param value - value of entry
* @param trimLen - length to trim
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
*/
void add(StreamId id, K key, V value, int trimLen, boolean trimStrict);
void add(StreamMessageId id, K key, V value, int trimLen, boolean trimStrict);
/**
* Appends new entries and returns generated Stream ID
* Appends new entries and returns generated Stream Message ID
*
* @param entries - entries to add
* @return Stream ID
* @return Stream Message ID
*/
StreamId addAll(Map<K, V> entries);
StreamMessageId addAll(Map<K, V> entries);
/**
* Appends new entries by specified Stream ID
* Appends new entries by specified Stream Message ID
*
* @param id - Stream ID
* @param id - Stream Message ID
* @param entries - entries to add
*/
void addAll(StreamId id, Map<K, V> entries);
void addAll(StreamMessageId id, Map<K, V> entries);
/**
* Appends new entries and returns generated Stream ID.
* Appends new entries and returns generated Stream Message ID.
* Trims stream to a specified <code>trimLen</code> size.
* If <code>trimStrict</code> is <code>false</code> then trims to few tens of entries more than specified length to trim.
*
* @param entries - entries to add
* @param trimLen - length to trim
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return Stream ID
* @return Stream Message ID
*/
StreamId addAll(Map<K, V> entries, int trimLen, boolean trimStrict);
StreamMessageId addAll(Map<K, V> entries, int trimLen, boolean trimStrict);
/**
* Appends new entries by specified Stream ID.
* Appends new entries by specified Stream Message ID.
* Trims stream to a specified <code>trimLen</code> size.
* If <code>trimStrict</code> is <code>false</code> then trims to few tens of entries more than specified length to trim.
*
* @param id - Stream ID
* @param id - Stream Message ID
* @param entries - entries to add
* @param trimLen - length to trim
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
*/
void addAll(StreamId id, Map<K, V> entries, int trimLen, boolean trimStrict);
void addAll(StreamMessageId id, Map<K, V> entries, int trimLen, boolean trimStrict);
/**
* Read stream data by specified collection of Stream IDs.
* Read stream data by specified collection of Stream Message IDs.
*
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
* @param ids - collection of Stream Message IDs
* @return stream data mapped by Stream Message ID
*/
Map<StreamId, Map<K, V>> read(StreamId ... ids);
Map<StreamMessageId, Map<K, V>> read(StreamMessageId ... ids);
/**
* Read stream data by specified collection of Stream IDs.
* Read stream data by specified collection of Stream Message IDs.
*
* @param count - stream data size limit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
* @param ids - collection of Stream Message IDs
* @return stream data mapped by Stream Message ID
*/
Map<StreamId, Map<K, V>> read(int count, StreamId ... ids);
Map<StreamMessageId, Map<K, V>> read(int count, StreamMessageId ... ids);
/**
* Read stream data by specified collection of Stream IDs.
* Read stream data by specified collection of Stream Message IDs.
* Wait for stream data availability for specified <code>timeout</code> interval.
*
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
* @param ids - collection of Stream Message IDs
* @return stream data mapped by Stream Message ID
*/
Map<StreamId, Map<K, V>> read(long timeout, TimeUnit unit, StreamId ... ids);
Map<StreamMessageId, Map<K, V>> read(long timeout, TimeUnit unit, StreamMessageId ... ids);
/**
* Read stream data by specified collection of Stream IDs.
* Read stream data by specified collection of Stream Message IDs.
* Wait for stream data availability for specified <code>timeout</code> interval.
*
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
* @param ids - collection of Stream Message IDs
* @return stream data mapped by Stream Message ID
*/
Map<StreamId, Map<K, V>> read(int count, long timeout, TimeUnit unit, StreamId ... ids);
Map<StreamMessageId, Map<K, V>> read(int count, long timeout, TimeUnit unit, StreamMessageId ... ids);
/**
* Read stream data by specified stream name including this stream.
@ -295,9 +332,9 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
* @return stream data mapped by key and Stream Message ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(StreamId id, String name2, StreamId id2);
Map<String, Map<StreamMessageId, Map<K, V>>> read(StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
@ -307,18 +344,18 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* @param id2 - id of second stream
* @param name3 - name of third stream
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
* @return stream data mapped by key and Stream Message ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(StreamId id, String name2, StreamId id2, String name3, StreamId id3);
Map<String, Map<StreamMessageId, Map<K, V>>> read(StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
* Read stream data by specified Stream Message ID mapped by name including this stream.
*
* @param id - id of this stream
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
* @param nameToId - Stream Message ID mapped by name
* @return stream data mapped by key and Stream Message ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(StreamId id, Map<String, StreamId> nameToId);
Map<String, Map<StreamMessageId, Map<K, V>>> read(StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data by specified stream name including this stream.
@ -327,9 +364,9 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
* @return stream data mapped by key and Stream Message ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(int count, StreamId id, String name2, StreamId id2);
Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
@ -340,19 +377,19 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* @param id2 - id of second stream
* @param name3 - name of third stream
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
* @return stream data mapped by key and Stream Message ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(int count, StreamId id, String name2, StreamId id2, String name3, StreamId id3);
Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
* Read stream data by specified Stream Message ID mapped by name including this stream.
*
* @param count - stream data size limit
* @param id - id of this stream
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
* @param nameToId - Stream Message ID mapped by name
* @return stream data mapped by key and Stream Message ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(int count, StreamId id, Map<String, StreamId> nameToId);
Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data by specified stream name including this stream.
@ -363,9 +400,9 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
* @return stream data mapped by key and Stream Message ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2);
Map<String, Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
@ -378,21 +415,21 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* @param id2 - id of second stream
* @param name3 - name of third stream
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
* @return stream data mapped by key and Stream Message ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2, String name3, StreamId id3);
Map<String, Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
* Read stream data by specified Stream Message ID mapped by name including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
* @param nameToId - Stream Message ID mapped by name
* @return stream data mapped by key and Stream Message ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> nameToId);
Map<String, Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data by specified stream name including this stream.
@ -404,9 +441,9 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
* @return stream data mapped by key and Stream Message ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2);
Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
@ -420,59 +457,83 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* @param id2 - id of second stream
* @param name3 - name of third stream
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
* @return stream data mapped by key and Stream Message ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2, String name3, StreamId id3);
Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
* Read stream data by specified Stream Message ID mapped by name including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
* @param nameToId - Stream Message ID mapped by name
* @return stream data mapped by key and Stream Message ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> nameToId);
Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data in range by specified start Stream ID (included) and end Stream ID (included).
* Read stream data in range by specified start Stream Message ID (included) and end Stream Message ID (included).
*
* @param startId - start Stream ID
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
* @param startId - start Stream Message ID
* @param endId - end Stream Message ID
* @return stream data mapped by Stream Message ID
*/
Map<StreamId, Map<K, V>> range(StreamId startId, StreamId endId);
Map<StreamMessageId, Map<K, V>> range(StreamMessageId startId, StreamMessageId endId);
/**
* Read stream data in range by specified start Stream ID (included) and end Stream ID (included).
* Read stream data in range by specified start Stream Message ID (included) and end Stream Message ID (included).
*
* @param count - stream data size limit
* @param startId - start Stream ID
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
* @param startId - start Stream Message ID
* @param endId - end Stream Message ID
* @return stream data mapped by Stream Message ID
*/
Map<StreamId, Map<K, V>> range(int count, StreamId startId, StreamId endId);
Map<StreamMessageId, Map<K, V>> range(int count, StreamMessageId startId, StreamMessageId endId);
/**
* Read stream data in reverse order in range by specified start Stream ID (included) and end Stream ID (included).
* Read stream data in reverse order in range by specified start Stream Message ID (included) and end Stream Message ID (included).
*
* @param startId - start Stream ID
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
* @param startId - start Stream Message ID
* @param endId - end Stream Message ID
* @return stream data mapped by Stream Message ID
*/
Map<StreamId, Map<K, V>> rangeReversed(StreamId startId, StreamId endId);
Map<StreamMessageId, Map<K, V>> rangeReversed(StreamMessageId startId, StreamMessageId endId);
/**
* Read stream data in reverse order in range by specified start Stream ID (included) and end Stream ID (included).
* Read stream data in reverse order in range by specified start Stream Message ID (included) and end Stream Message ID (included).
*
* @param count - stream data size limit
* @param startId - start Stream ID
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
* @param startId - start Stream Message ID
* @param endId - end Stream Message ID
* @return stream data mapped by Stream Message ID
*/
Map<StreamMessageId, Map<K, V>> rangeReversed(int count, StreamMessageId startId, StreamMessageId endId);
/**
* Removes messages by id.
*
* @param ids - id of messages to remove
* @return deleted messages amount
*/
long remove(StreamMessageId... ids);
/**
* Trims stream to specified size
*
* @param size - new size of stream
* @return number of deleted messages
*/
long trim(int size);
/**
* Trims stream to few tens of entries more than specified length to trim.
*
* @param size - new size of stream
* @return number of deleted messages
*/
Map<StreamId, Map<K, V>> rangeReversed(int count, StreamId startId, StreamId endId);
long trimNonStrict(int size);
}

@ -43,13 +43,39 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* Creates consumer group by name and stream id.
* Only new messages after defined stream <code>id</code> will be available for consumers of this group.
* <p>
* {@link StreamId#NEWEST} is used for messages arrived since the moment of group creating
* {@link StreamMessageId#NEWEST} is used for messages arrived since the moment of group creating
*
* @param groupName - name of group
* @param id - stream id
* @return void
*/
RFuture<Void> createGroupAsync(String groupName, StreamId id);
RFuture<Void> createGroupAsync(String groupName, StreamMessageId id);
/**
* Removes group by name.
*
* @param groupName - name of group
* @return void
*/
RFuture<Void> removeGroupAsync(String groupName);
/**
* Removes consumer of the group by name.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @return number of pending messages owned by consumer
*/
RFuture<Long> removeConsumerAsync(String groupName, String consumerName);
/**
* Updates next message id delivered to consumers.
*
* @param groupName - name of group
* @param id - Stream Message ID
* @return void
*/
RFuture<Void> updateGroupMessageIdAsync(String groupName, StreamMessageId id);
/**
* Marks pending messages by group name and stream <code>ids</code> as correctly processed.
@ -58,7 +84,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param ids - stream ids
* @return marked messages amount
*/
RFuture<Long> ackAsync(String groupName, StreamId... ids);
RFuture<Long> ackAsync(String groupName, StreamMessageId... ids);
/**
* Returns pending messages by group name
@ -72,8 +98,8 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* Returns list of pending messages by group name.
* Limited by start stream id and end stream id and count.
* <p>
* {@link StreamId#MAX} is used as max stream id
* {@link StreamId#MIN} is used as min stream id
* {@link StreamMessageId#MAX} is used as max stream id
* {@link StreamMessageId#MIN} is used as min stream id
*
* @param groupName - name of group
* @param startId - start stream id
@ -81,14 +107,14 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param count - amount of messages
* @return list
*/
RFuture<List<PendingEntry>> listPendingAsync(String groupName, StreamId startId, StreamId endId, int count);
RFuture<List<PendingEntry>> listPendingAsync(String groupName, StreamMessageId startId, StreamMessageId endId, int count);
/**
* Returns list of pending messages by group name and consumer name.
* Limited by start stream id and end stream id and count.
* <p>
* {@link StreamId#MAX} is used as max stream id
* {@link StreamId#MIN} is used as min stream id
* {@link StreamMessageId#MAX} is used as max stream id
* {@link StreamMessageId#MIN} is used as min stream id
*
* @param consumerName - name of consumer
* @param groupName - name of group
@ -97,7 +123,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param count - amount of messages
* @return list
*/
RFuture<List<PendingEntry>> listPendingAsync(String groupName, StreamId startId, StreamId endId, int count, String consumerName);
RFuture<List<PendingEntry>> listPendingAsync(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, int count);
/**
* Transfers ownership of pending messages by id to a new consumer
@ -110,7 +136,20 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param ids - stream ids
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> claimAsync(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamId ... ids);
RFuture<Map<StreamMessageId, Map<K, V>>> claimAsync(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId ... ids);
/**
* Transfers ownership of pending messages by id to a new consumer
* by name if idle time of messages is greater than defined value.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param idleTime - minimum idle time of messages
* @param idleTimeUnit - idle time unit
* @param ids - Stream Message IDs
* @return list of Stream Message IDs
*/
RFuture<List<StreamMessageId>> fastClaimAsync(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
@ -120,7 +159,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, StreamId ... ids);
RFuture<Map<StreamMessageId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, StreamMessageId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
@ -131,7 +170,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, int count, StreamId ... ids);
RFuture<Map<StreamMessageId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, int count, StreamMessageId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
@ -144,7 +183,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit, StreamId ... ids);
RFuture<Map<StreamMessageId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
@ -158,7 +197,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamId ... ids);
RFuture<Map<StreamMessageId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId ... ids);
/**
* Returns number of entries in stream
@ -174,7 +213,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param value - value of entry
* @return Stream ID
*/
RFuture<StreamId> addAsync(K key, V value);
RFuture<StreamMessageId> addAsync(K key, V value);
/**
* Appends a new entry by specified Stream ID
@ -184,7 +223,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param value - value of entry
* @return void
*/
RFuture<Void> addAsync(StreamId id, K key, V value);
RFuture<Void> addAsync(StreamMessageId id, K key, V value);
/**
* Appends a new entry and returns generated Stream ID.
@ -197,7 +236,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return Stream ID
*/
RFuture<StreamId> addAsync(K key, V value, int trimLen, boolean trimStrict);
RFuture<StreamMessageId> addAsync(K key, V value, int trimLen, boolean trimStrict);
/**
* Appends a new entry by specified Stream ID.
@ -211,7 +250,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return void
*/
RFuture<Void> addAsync(StreamId id, K key, V value, int trimLen, boolean trimStrict);
RFuture<Void> addAsync(StreamMessageId id, K key, V value, int trimLen, boolean trimStrict);
/**
* Appends new entries and returns generated Stream ID
@ -219,7 +258,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param entries - entries to add
* @return Stream ID
*/
RFuture<StreamId> addAllAsync(Map<K, V> entries);
RFuture<StreamMessageId> addAllAsync(Map<K, V> entries);
/**
* Appends new entries by specified Stream ID
@ -228,7 +267,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param entries - entries to add
* @return void
*/
RFuture<Void> addAllAsync(StreamId id, Map<K, V> entries);
RFuture<Void> addAllAsync(StreamMessageId id, Map<K, V> entries);
/**
* Appends new entries and returns generated Stream ID.
@ -240,7 +279,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return Stream ID
*/
RFuture<StreamId> addAllAsync(Map<K, V> entries, int trimLen, boolean trimStrict);
RFuture<StreamMessageId> addAllAsync(Map<K, V> entries, int trimLen, boolean trimStrict);
/**
* Appends new entries by specified Stream ID.
@ -253,7 +292,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return void
*/
RFuture<Void> addAllAsync(StreamId id, Map<K, V> entries, int trimLen, boolean trimStrict);
RFuture<Void> addAllAsync(StreamMessageId id, Map<K, V> entries, int trimLen, boolean trimStrict);
/**
* Read stream data by specified collection of Stream IDs.
@ -261,7 +300,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> readAsync(StreamId ... ids);
RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(StreamMessageId ... ids);
/**
* Read stream data by specified collection of Stream IDs.
@ -270,7 +309,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> readAsync(int count, StreamId ... ids);
RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(int count, StreamMessageId ... ids);
/**
* Read stream data by specified collection of Stream IDs.
@ -281,7 +320,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> readAsync(long timeout, TimeUnit unit, StreamId ... ids);
RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(long timeout, TimeUnit unit, StreamMessageId ... ids);
/**
* Read stream data by specified collection of Stream IDs.
@ -293,7 +332,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> readAsync(int count, long timeout, TimeUnit unit, StreamId ... ids);
RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId ... ids);
/**
* Read stream data by specified stream name including this stream.
@ -303,7 +342,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(StreamId id, String name2, StreamId id2);
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
@ -315,7 +354,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(StreamId id, String name2, StreamId id2, String name3, StreamId id3);
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
@ -324,7 +363,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(StreamId id, Map<String, StreamId> nameToId);
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data by specified stream name including this stream.
@ -335,7 +374,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, StreamId id, String name2, StreamId id2);
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
@ -348,7 +387,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, StreamId id, String name2, StreamId id2, String name3, StreamId id3);
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
@ -358,7 +397,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, StreamId id, Map<String, StreamId> nameToId);
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data by specified stream name including this stream.
@ -371,7 +410,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2);
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
@ -386,7 +425,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2, String name3, StreamId id3);
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
@ -398,7 +437,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> nameToId);
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data by specified stream name including this stream.
@ -412,7 +451,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2);
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
@ -428,7 +467,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2, String name3, StreamId id3);
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
@ -441,7 +480,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> nameToId);
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data in range by specified start Stream ID (included) and end Stream ID (included).
@ -450,7 +489,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> rangeAsync(StreamId startId, StreamId endId);
RFuture<Map<StreamMessageId, Map<K, V>>> rangeAsync(StreamMessageId startId, StreamMessageId endId);
/**
* Read stream data in range by specified start Stream ID (included) and end Stream ID (included).
@ -460,7 +499,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> rangeAsync(int count, StreamId startId, StreamId endId);
RFuture<Map<StreamMessageId, Map<K, V>>> rangeAsync(int count, StreamMessageId startId, StreamMessageId endId);
/**
* Read stream data in reverse order in range by specified start Stream ID (included) and end Stream ID (included).
@ -469,7 +508,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> rangeReversedAsync(StreamId startId, StreamId endId);
RFuture<Map<StreamMessageId, Map<K, V>>> rangeReversedAsync(StreamMessageId startId, StreamMessageId endId);
/**
* Read stream data in reverse order in range by specified start Stream ID (included) and end Stream ID (included).
@ -479,6 +518,30 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> rangeReversedAsync(int count, StreamId startId, StreamId endId);
RFuture<Map<StreamMessageId, Map<K, V>>> rangeReversedAsync(int count, StreamMessageId startId, StreamMessageId endId);
/**
* Removes messages by id.
*
* @param ids - id of messages to remove
* @return deleted messages amount
*/
RFuture<Long> removeAsync(StreamMessageId... ids);
/**
* Trims stream to specified size
*
* @param size - new size of stream
* @return number of deleted messages
*/
RFuture<Long> trimAsync(int size);
/**
* Trims stream to few tens of entries more than specified length to trim.
*
* @param size - new size of stream
* @return number of deleted messages
*/
RFuture<Long> trimNonStrictAsync(int size);
}

@ -45,13 +45,39 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* Creates consumer group by name and stream id.
* Only new messages after defined stream <code>id</code> will be available for consumers of this group.
* <p>
* {@link StreamId#NEWEST} is used for messages arrived since the moment of group creating
* {@link StreamMessageId#NEWEST} is used for messages arrived since the moment of group creating
*
* @param groupName - name of group
* @param id - stream id
* @return void
*/
Mono<Void> createGroup(String groupName, StreamId id);
Mono<Void> createGroup(String groupName, StreamMessageId id);
/**
* Removes group by name.
*
* @param groupName - name of group
* @return void
*/
Mono<Void> removeGroup(String groupName);
/**
* Removes consumer of the group by name.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @return number of pending messages owned by consumer
*/
Mono<Long> removeConsumer(String groupName, String consumerName);
/**
* Updates next message id delivered to consumers.
*
* @param groupName - name of group
* @param id - Stream Message ID
* @return void
*/
Mono<Void> updateGroupMessageId(String groupName, StreamMessageId id);
/**
* Marks pending messages by group name and stream <code>ids</code> as correctly processed.
@ -60,7 +86,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param ids - stream ids
* @return marked messages amount
*/
Mono<Long> ack(String groupName, StreamId... ids);
Mono<Long> ack(String groupName, StreamMessageId... ids);
/**
* Returns pending messages by group name
@ -74,8 +100,8 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* Returns list of pending messages by group name.
* Limited by start stream id and end stream id and count.
* <p>
* {@link StreamId#MAX} is used as max stream id
* {@link StreamId#MIN} is used as min stream id
* {@link StreamMessageId#MAX} is used as max stream id
* {@link StreamMessageId#MIN} is used as min stream id
*
* @param groupName - name of group
* @param startId - start stream id
@ -83,14 +109,14 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param count - amount of messages
* @return list
*/
Mono<List<PendingEntry>> listPending(String groupName, StreamId startId, StreamId endId, int count);
Mono<List<PendingEntry>> listPending(String groupName, StreamMessageId startId, StreamMessageId endId, int count);
/**
* Returns list of pending messages by group name and consumer name.
* Limited by start stream id and end stream id and count.
* <p>
* {@link StreamId#MAX} is used as max stream id
* {@link StreamId#MIN} is used as min stream id
* {@link StreamMessageId#MAX} is used as max stream id
* {@link StreamMessageId#MIN} is used as min stream id
*
* @param consumerName - name of consumer
* @param groupName - name of group
@ -99,7 +125,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param count - amount of messages
* @return list
*/
Mono<List<PendingEntry>> listPending(String groupName, StreamId startId, StreamId endId, int count, String consumerName);
Mono<List<PendingEntry>> listPending(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, int count);
/**
* Transfers ownership of pending messages by id to a new consumer
@ -112,7 +138,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param ids - stream ids
* @return stream data mapped by Stream ID
*/
Mono<Map<StreamId, Map<K, V>>> claim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamId ... ids);
Mono<Map<StreamMessageId, Map<K, V>>> claim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
@ -122,7 +148,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Mono<Map<StreamId, Map<K, V>>> readGroup(String groupName, String consumerName, StreamId ... ids);
Mono<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, StreamMessageId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
@ -133,7 +159,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Mono<Map<StreamId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, StreamId ... ids);
Mono<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, StreamMessageId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
@ -146,7 +172,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Mono<Map<StreamId, Map<K, V>>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamId ... ids);
Mono<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
@ -160,7 +186,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Mono<Map<StreamId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamId ... ids);
Mono<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId ... ids);
/**
@ -177,7 +203,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param value - value of entry
* @return Stream ID
*/
Mono<StreamId> add(K key, V value);
Mono<StreamMessageId> add(K key, V value);
/**
* Appends a new entry by specified Stream ID
@ -187,7 +213,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param value - value of entry
* @return void
*/
Mono<Void> add(StreamId id, K key, V value);
Mono<Void> add(StreamMessageId id, K key, V value);
/**
* Appends a new entry and returns generated Stream ID.
@ -200,7 +226,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return Stream ID
*/
Mono<StreamId> add(K key, V value, int trimLen, boolean trimStrict);
Mono<StreamMessageId> add(K key, V value, int trimLen, boolean trimStrict);
/**
* Appends a new entry by specified Stream ID.
@ -214,7 +240,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return void
*/
Mono<Void> add(StreamId id, K key, V value, int trimLen, boolean trimStrict);
Mono<Void> add(StreamMessageId id, K key, V value, int trimLen, boolean trimStrict);
/**
* Appends new entries and returns generated Stream ID
@ -222,7 +248,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param entries - entries to add
* @return Stream ID
*/
Mono<StreamId> addAll(Map<K, V> entries);
Mono<StreamMessageId> addAll(Map<K, V> entries);
/**
* Appends new entries by specified Stream ID
@ -231,7 +257,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param entries - entries to add
* @return void
*/
Mono<Void> addAll(StreamId id, Map<K, V> entries);
Mono<Void> addAll(StreamMessageId id, Map<K, V> entries);
/**
* Appends new entries and returns generated Stream ID.
@ -243,7 +269,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return Stream ID
*/
Mono<StreamId> addAll(Map<K, V> entries, int trimLen, boolean trimStrict);
Mono<StreamMessageId> addAll(Map<K, V> entries, int trimLen, boolean trimStrict);
/**
* Appends new entries by specified Stream ID.
@ -256,7 +282,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return void
*/
Mono<Void> addAll(StreamId id, Map<K, V> entries, int trimLen, boolean trimStrict);
Mono<Void> addAll(StreamMessageId id, Map<K, V> entries, int trimLen, boolean trimStrict);
/**
* Read stream data by specified collection of Stream IDs.
@ -264,7 +290,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Mono<Map<StreamId, Map<K, V>>> read(StreamId ... ids);
Mono<Map<StreamMessageId, Map<K, V>>> read(StreamMessageId ... ids);
/**
* Read stream data by specified collection of Stream IDs.
@ -273,7 +299,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Mono<Map<StreamId, Map<K, V>>> read(int count, StreamId ... ids);
Mono<Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId ... ids);
/**
* Read stream data by specified collection of Stream IDs.
@ -284,7 +310,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Mono<Map<StreamId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamId ... ids);
Mono<Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId ... ids);
/**
* Read stream data by specified collection of Stream IDs.
@ -296,7 +322,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Mono<Map<StreamId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamId ... ids);
Mono<Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId ... ids);
/**
* Read stream data by specified stream name including this stream.
@ -306,7 +332,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
*/
Mono<Map<String, Map<StreamId, Map<K, V>>>> read(StreamId id, String name2, StreamId id2);
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> read(StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
@ -318,7 +344,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
*/
Mono<Map<String, Map<StreamId, Map<K, V>>>> read(StreamId id, String name2, StreamId id2, String name3, StreamId id3);
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> read(StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
@ -327,7 +353,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
*/
Mono<Map<String, Map<StreamId, Map<K, V>>>> read(StreamId id, Map<String, StreamId> nameToId);
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> read(StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data by specified stream name including this stream.
@ -338,7 +364,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
*/
Mono<Map<String, Map<StreamId, Map<K, V>>>> read(int count, StreamId id, String name2, StreamId id2);
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> read(int count, StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
@ -351,7 +377,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
*/
Mono<Map<String, Map<StreamId, Map<K, V>>>> read(int count, StreamId id, String name2, StreamId id2, String name3, StreamId id3);
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> read(int count, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
@ -361,7 +387,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
*/
Mono<Map<String, Map<StreamId, Map<K, V>>>> read(int count, StreamId id, Map<String, StreamId> nameToId);
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> read(int count, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data by specified stream name including this stream.
@ -374,7 +400,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
*/
Mono<Map<String, Map<StreamId, Map<K, V>>>> read(long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2);
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> read(long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
@ -389,7 +415,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
*/
Mono<Map<String, Map<StreamId, Map<K, V>>>> read(long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2, String name3, StreamId id3);
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> read(long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
@ -401,7 +427,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
*/
Mono<Map<String, Map<StreamId, Map<K, V>>>> read(long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> nameToId);
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> read(long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data by specified stream name including this stream.
@ -415,7 +441,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
*/
Mono<Map<String, Map<StreamId, Map<K, V>>>> read(int count, long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2);
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
@ -431,7 +457,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
*/
Mono<Map<String, Map<StreamId, Map<K, V>>>> read(int count, long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2, String name3, StreamId id3);
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
@ -444,7 +470,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
*/
Mono<Map<String, Map<StreamId, Map<K, V>>>> read(int count, long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> nameToId);
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data in range by specified start Stream ID (included) and end Stream ID (included).
@ -453,7 +479,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
*/
Mono<Map<StreamId, Map<K, V>>> range(StreamId startId, StreamId endId);
Mono<Map<StreamMessageId, Map<K, V>>> range(StreamMessageId startId, StreamMessageId endId);
/**
* Read stream data in range by specified start Stream ID (included) and end Stream ID (included).
@ -463,7 +489,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
*/
Mono<Map<StreamId, Map<K, V>>> range(int count, StreamId startId, StreamId endId);
Mono<Map<StreamMessageId, Map<K, V>>> range(int count, StreamMessageId startId, StreamMessageId endId);
/**
* Read stream data in reverse order in range by specified start Stream ID (included) and end Stream ID (included).
@ -472,7 +498,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
*/
Mono<Map<StreamId, Map<K, V>>> rangeReversed(StreamId startId, StreamId endId);
Mono<Map<StreamMessageId, Map<K, V>>> rangeReversed(StreamMessageId startId, StreamMessageId endId);
/**
* Read stream data in reverse order in range by specified start Stream ID (included) and end Stream ID (included).
@ -482,6 +508,30 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
*/
Mono<Map<StreamId, Map<K, V>>> rangeReversed(int count, StreamId startId, StreamId endId);
Mono<Map<StreamMessageId, Map<K, V>>> rangeReversed(int count, StreamMessageId startId, StreamMessageId endId);
/**
* Removes messages by id.
*
* @param ids - id of messages to remove
* @return deleted messages amount
*/
Mono<Long> remove(StreamMessageId... ids);
/**
* Trims stream to specified size
*
* @param size - new size of stream
* @return number of deleted messages
*/
Mono<Long> trim(int size);
/**
* Trims stream to few tens of entries more than specified length to trim.
*
* @param size - new size of stream
* @return number of deleted messages
*/
Mono<Long> trimNonStrict(int size);
}

@ -45,13 +45,39 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* Creates consumer group by name and stream id.
* Only new messages after defined stream <code>id</code> will be available for consumers of this group.
* <p>
* {@link StreamId#NEWEST} is used for messages arrived since the moment of group creating
* {@link StreamMessageId#NEWEST} is used for messages arrived since the moment of group creating
*
* @param groupName - name of group
* @param id - stream id
* @return void
*/
Flowable<Void> createGroup(String groupName, StreamId id);
Flowable<Void> createGroup(String groupName, StreamMessageId id);
/**
* Removes group by name.
*
* @param groupName - name of group
* @return void
*/
Flowable<Void> removeGroup(String groupName);
/**
* Removes consumer of the group by name.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @return number of pending messages owned by consumer
*/
Flowable<Long> removeConsumer(String groupName, String consumerName);
/**
* Updates next message id delivered to consumers.
*
* @param groupName - name of group
* @param id - Stream Message ID
* @return void
*/
Flowable<Void> updateGroupMessageId(String groupName, StreamMessageId id);
/**
* Marks pending messages by group name and stream <code>ids</code> as correctly processed.
@ -60,7 +86,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param ids - stream ids
* @return marked messages amount
*/
Flowable<Long> ack(String groupName, StreamId... ids);
Flowable<Long> ack(String groupName, StreamMessageId... ids);
/**
* Returns pending messages by group name
@ -74,8 +100,8 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* Returns list of pending messages by group name.
* Limited by start stream id and end stream id and count.
* <p>
* {@link StreamId#MAX} is used as max stream id
* {@link StreamId#MIN} is used as min stream id
* {@link StreamMessageId#MAX} is used as max stream id
* {@link StreamMessageId#MIN} is used as min stream id
*
* @param groupName - name of group
* @param startId - start stream id
@ -83,14 +109,14 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param count - amount of messages
* @return list
*/
Flowable<List<PendingEntry>> listPending(String groupName, StreamId startId, StreamId endId, int count);
Flowable<List<PendingEntry>> listPending(String groupName, StreamMessageId startId, StreamMessageId endId, int count);
/**
* Returns list of pending messages by group name and consumer name.
* Limited by start stream id and end stream id and count.
* <p>
* {@link StreamId#MAX} is used as max stream id
* {@link StreamId#MIN} is used as min stream id
* {@link StreamMessageId#MAX} is used as max stream id
* {@link StreamMessageId#MIN} is used as min stream id
*
* @param consumerName - name of consumer
* @param groupName - name of group
@ -99,7 +125,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param count - amount of messages
* @return list
*/
Flowable<List<PendingEntry>> listPending(String groupName, StreamId startId, StreamId endId, int count, String consumerName);
Flowable<List<PendingEntry>> listPending(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, int count);
/**
* Transfers ownership of pending messages by id to a new consumer
@ -112,7 +138,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param ids - stream ids
* @return stream data mapped by Stream ID
*/
Flowable<Map<StreamId, Map<K, V>>> claim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamId ... ids);
Flowable<Map<StreamMessageId, Map<K, V>>> claim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
@ -122,7 +148,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Flowable<Map<StreamId, Map<K, V>>> readGroup(String groupName, String consumerName, StreamId ... ids);
Flowable<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, StreamMessageId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
@ -133,7 +159,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Flowable<Map<StreamId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, StreamId ... ids);
Flowable<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, StreamMessageId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
@ -146,7 +172,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Flowable<Map<StreamId, Map<K, V>>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamId ... ids);
Flowable<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
@ -160,7 +186,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Flowable<Map<StreamId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamId ... ids);
Flowable<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId ... ids);
/**
@ -177,7 +203,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param value - value of entry
* @return Stream ID
*/
Flowable<StreamId> add(K key, V value);
Flowable<StreamMessageId> add(K key, V value);
/**
* Appends a new entry by specified Stream ID
@ -187,7 +213,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param value - value of entry
* @return void
*/
Flowable<Void> add(StreamId id, K key, V value);
Flowable<Void> add(StreamMessageId id, K key, V value);
/**
* Appends a new entry and returns generated Stream ID.
@ -200,7 +226,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return Stream ID
*/
Flowable<StreamId> add(K key, V value, int trimLen, boolean trimStrict);
Flowable<StreamMessageId> add(K key, V value, int trimLen, boolean trimStrict);
/**
* Appends a new entry by specified Stream ID.
@ -214,7 +240,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return void
*/
Flowable<Void> add(StreamId id, K key, V value, int trimLen, boolean trimStrict);
Flowable<Void> add(StreamMessageId id, K key, V value, int trimLen, boolean trimStrict);
/**
* Appends new entries and returns generated Stream ID
@ -222,7 +248,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param entries - entries to add
* @return Stream ID
*/
Flowable<StreamId> addAll(Map<K, V> entries);
Flowable<StreamMessageId> addAll(Map<K, V> entries);
/**
* Appends new entries by specified Stream ID
@ -231,7 +257,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param entries - entries to add
* @return void
*/
Flowable<Void> addAll(StreamId id, Map<K, V> entries);
Flowable<Void> addAll(StreamMessageId id, Map<K, V> entries);
/**
* Appends new entries and returns generated Stream ID.
@ -243,7 +269,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return Stream ID
*/
Flowable<StreamId> addAll(Map<K, V> entries, int trimLen, boolean trimStrict);
Flowable<StreamMessageId> addAll(Map<K, V> entries, int trimLen, boolean trimStrict);
/**
* Appends new entries by specified Stream ID.
@ -256,7 +282,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return void
*/
Flowable<Void> addAll(StreamId id, Map<K, V> entries, int trimLen, boolean trimStrict);
Flowable<Void> addAll(StreamMessageId id, Map<K, V> entries, int trimLen, boolean trimStrict);
/**
* Read stream data by specified collection of Stream IDs.
@ -264,7 +290,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Flowable<Map<StreamId, Map<K, V>>> read(StreamId ... ids);
Flowable<Map<StreamMessageId, Map<K, V>>> read(StreamMessageId ... ids);
/**
* Read stream data by specified collection of Stream IDs.
@ -273,7 +299,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Flowable<Map<StreamId, Map<K, V>>> read(int count, StreamId ... ids);
Flowable<Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId ... ids);
/**
* Read stream data by specified collection of Stream IDs.
@ -284,7 +310,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Flowable<Map<StreamId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamId ... ids);
Flowable<Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId ... ids);
/**
* Read stream data by specified collection of Stream IDs.
@ -296,7 +322,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Flowable<Map<StreamId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamId ... ids);
Flowable<Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId ... ids);
/**
* Read stream data by specified stream name including this stream.
@ -306,7 +332,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
*/
Flowable<Map<String, Map<StreamId, Map<K, V>>>> read(StreamId id, String name2, StreamId id2);
Flowable<Map<String, Map<StreamMessageId, Map<K, V>>>> read(StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
@ -318,7 +344,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
*/
Flowable<Map<String, Map<StreamId, Map<K, V>>>> read(StreamId id, String name2, StreamId id2, String name3, StreamId id3);
Flowable<Map<String, Map<StreamMessageId, Map<K, V>>>> read(StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
@ -327,7 +353,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
*/
Flowable<Map<String, Map<StreamId, Map<K, V>>>> read(StreamId id, Map<String, StreamId> nameToId);
Flowable<Map<String, Map<StreamMessageId, Map<K, V>>>> read(StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data by specified stream name including this stream.
@ -338,7 +364,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
*/
Flowable<Map<String, Map<StreamId, Map<K, V>>>> read(int count, StreamId id, String name2, StreamId id2);
Flowable<Map<String, Map<StreamMessageId, Map<K, V>>>> read(int count, StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
@ -351,7 +377,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
*/
Flowable<Map<String, Map<StreamId, Map<K, V>>>> read(int count, StreamId id, String name2, StreamId id2, String name3, StreamId id3);
Flowable<Map<String, Map<StreamMessageId, Map<K, V>>>> read(int count, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
@ -361,7 +387,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
*/
Flowable<Map<String, Map<StreamId, Map<K, V>>>> read(int count, StreamId id, Map<String, StreamId> nameToId);
Flowable<Map<String, Map<StreamMessageId, Map<K, V>>>> read(int count, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data by specified stream name including this stream.
@ -374,7 +400,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
*/
Flowable<Map<String, Map<StreamId, Map<K, V>>>> read(long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2);
Flowable<Map<String, Map<StreamMessageId, Map<K, V>>>> read(long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
@ -389,7 +415,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
*/
Flowable<Map<String, Map<StreamId, Map<K, V>>>> read(long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2, String name3, StreamId id3);
Flowable<Map<String, Map<StreamMessageId, Map<K, V>>>> read(long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
@ -401,7 +427,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
*/
Flowable<Map<String, Map<StreamId, Map<K, V>>>> read(long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> nameToId);
Flowable<Map<String, Map<StreamMessageId, Map<K, V>>>> read(long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data by specified stream name including this stream.
@ -415,7 +441,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
*/
Flowable<Map<String, Map<StreamId, Map<K, V>>>> read(int count, long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2);
Flowable<Map<String, Map<StreamMessageId, Map<K, V>>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
@ -431,7 +457,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
*/
Flowable<Map<String, Map<StreamId, Map<K, V>>>> read(int count, long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2, String name3, StreamId id3);
Flowable<Map<String, Map<StreamMessageId, Map<K, V>>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
@ -444,7 +470,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
*/
Flowable<Map<String, Map<StreamId, Map<K, V>>>> read(int count, long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> nameToId);
Flowable<Map<String, Map<StreamMessageId, Map<K, V>>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data in range by specified start Stream ID (included) and end Stream ID (included).
@ -453,7 +479,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
*/
Flowable<Map<StreamId, Map<K, V>>> range(StreamId startId, StreamId endId);
Flowable<Map<StreamMessageId, Map<K, V>>> range(StreamMessageId startId, StreamMessageId endId);
/**
* Read stream data in range by specified start Stream ID (included) and end Stream ID (included).
@ -463,7 +489,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
*/
Flowable<Map<StreamId, Map<K, V>>> range(int count, StreamId startId, StreamId endId);
Flowable<Map<StreamMessageId, Map<K, V>>> range(int count, StreamMessageId startId, StreamMessageId endId);
/**
* Read stream data in reverse order in range by specified start Stream ID (included) and end Stream ID (included).
@ -472,7 +498,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
*/
Flowable<Map<StreamId, Map<K, V>>> rangeReversed(StreamId startId, StreamId endId);
Flowable<Map<StreamMessageId, Map<K, V>>> rangeReversed(StreamMessageId startId, StreamMessageId endId);
/**
* Read stream data in reverse order in range by specified start Stream ID (included) and end Stream ID (included).
@ -482,6 +508,30 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
*/
Flowable<Map<StreamId, Map<K, V>>> rangeReversed(int count, StreamId startId, StreamId endId);
Flowable<Map<StreamMessageId, Map<K, V>>> rangeReversed(int count, StreamMessageId startId, StreamMessageId endId);
/**
* Removes messages by id.
*
* @param ids - id of messages to remove
* @return deleted messages amount
*/
Flowable<Long> remove(StreamMessageId... ids);
/**
* Trims stream to specified size
*
* @param size - new size of stream
* @return number of deleted messages
*/
Flowable<Long> trim(int size);
/**
* Trims stream to few tens of entries more than specified length to trim.
*
* @param size - new size of stream
* @return number of deleted messages
*/
Flowable<Long> trimNonStrict(int size);
}

@ -21,34 +21,34 @@ package org.redisson.api;
* @author Nikita Koksharov
*
*/
public class StreamId {
public class StreamMessageId {
/**
* Defines minimal id. Used in {@link RStream#range} and {@link RStreamAsync#rangeAsync} methods
*/
public static final StreamId MIN = new StreamId(-1);
public static final StreamMessageId MIN = new StreamMessageId(-1);
/**
* Defines maximal id. Used in {@link RStream#range} and {@link RStreamAsync#rangeAsync} methods
*/
public static final StreamId MAX = new StreamId(-1);
public static final StreamMessageId MAX = new StreamMessageId(-1);
/**
* Defines latest id to receive Stream entries added since method invocation.
* <p>
* Used in {@link RStream#read} and {@link RStreamAsync#readAsync} methods
*/
public static final StreamId NEWEST = new StreamId(-1);
public static final StreamMessageId NEWEST = new StreamMessageId(-1);
private long id0;
private long id1;
public StreamId(long id0) {
public StreamMessageId(long id0) {
super();
this.id0 = id0;
}
public StreamId(long id0, long id1) {
public StreamMessageId(long id0, long id1) {
super();
this.id0 = id0;
this.id1 = id1;
@ -89,7 +89,7 @@ public class StreamId {
return false;
if (getClass() != obj.getClass())
return false;
StreamId other = (StreamId) obj;
StreamMessageId other = (StreamMessageId) obj;
if (id0 != other.id0)
return false;
if (id1 != other.id1)

@ -23,7 +23,7 @@ import java.util.Map.Entry;
import java.util.Set;
import org.redisson.api.RType;
import org.redisson.api.StreamId;
import org.redisson.api.StreamMessageId;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.convertor.BitsSizeReplayConvertor;
@ -70,6 +70,7 @@ import org.redisson.client.protocol.decoder.ScoredSortedSetScanDecoder;
import org.redisson.client.protocol.decoder.ScoredSortedSetScanReplayDecoder;
import org.redisson.client.protocol.decoder.SlotsDecoder;
import org.redisson.client.protocol.decoder.StreamIdDecoder;
import org.redisson.client.protocol.decoder.StreamIdListDecoder;
import org.redisson.client.protocol.decoder.StreamResultDecoder;
import org.redisson.client.protocol.decoder.StringDataDecoder;
import org.redisson.client.protocol.decoder.StringListReplayDecoder;
@ -325,17 +326,17 @@ public interface RedisCommands {
RedisCommand<Boolean> SETNX = new RedisCommand<Boolean>("SETNX", new BooleanReplayConvertor());
RedisCommand<Void> PSETEX = new RedisCommand<Void>("PSETEX", new VoidReplayConvertor());
RedisCommand<Map<String, Map<StreamId, Map<Object, Object>>>> XRANGE = new RedisCommand<Map<String, Map<StreamId, Map<Object, Object>>>>("XRANGE",
RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> XRANGE = new RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>>("XRANGE",
new ListMultiDecoder(
new ObjectDecoder(new StreamIdDecoder()),
new ObjectMapReplayDecoder(),
new ObjectMapReplayDecoder(ListMultiDecoder.RESET),
new ObjectMapJoinDecoder()));
RedisCommand<Map<String, Map<StreamId, Map<Object, Object>>>> XREVRANGE = new RedisCommand<Map<String, Map<StreamId, Map<Object, Object>>>>("XREVRANGE",
RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> XREVRANGE = new RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>>("XREVRANGE",
XRANGE.getReplayMultiDecoder());
RedisCommand<Map<String, Map<StreamId, Map<Object, Object>>>> XREAD = new RedisCommand<Map<String, Map<StreamId, Map<Object, Object>>>>("XREAD",
RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> XREAD = new RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>>("XREAD",
new ListMultiDecoder(
new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()),
new ObjectDecoder(new StreamIdDecoder()),
@ -345,9 +346,9 @@ public interface RedisCommands {
new ObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX),
new StreamResultDecoder()));
RedisCommand<Map<String, Map<StreamId, Map<Object, Object>>>> XREAD_BLOCKING = new RedisCommand<Map<String, Map<StreamId, Map<Object, Object>>>>("XREAD", XREAD.getReplayMultiDecoder());
RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> XREAD_BLOCKING = new RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>>("XREAD", XREAD.getReplayMultiDecoder());
RedisCommand<Map<StreamId, Map<Object, Object>>> XREAD_SINGLE = new RedisCommand<Map<StreamId, Map<Object, Object>>>("XREAD",
RedisCommand<Map<StreamMessageId, Map<Object, Object>>> XREAD_SINGLE = new RedisCommand<Map<StreamMessageId, Map<Object, Object>>>("XREAD",
new ListMultiDecoder(
new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()),
new ObjectDecoder(new StreamIdDecoder()),
@ -357,9 +358,9 @@ public interface RedisCommands {
new ObjectMapReplayDecoder(),
new StreamResultDecoder()));
RedisCommand<Map<StreamId, Map<Object, Object>>> XREAD_BLOCKING_SINGLE = new RedisCommand<Map<StreamId, Map<Object, Object>>>("XREAD", XREAD_SINGLE.getReplayMultiDecoder());
RedisCommand<Map<StreamMessageId, Map<Object, Object>>> XREAD_BLOCKING_SINGLE = new RedisCommand<Map<StreamMessageId, Map<Object, Object>>>("XREAD", XREAD_SINGLE.getReplayMultiDecoder());
RedisCommand<Map<String, Map<StreamId, Map<Object, Object>>>> XREADGROUP = new RedisCommand<Map<String, Map<StreamId, Map<Object, Object>>>>("XREADGROUP",
RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> XREADGROUP = new RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>>("XREADGROUP",
new ListMultiDecoder(
new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()),
new ObjectDecoder(new StreamIdDecoder()),
@ -369,7 +370,7 @@ public interface RedisCommands {
new ObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX),
new StreamResultDecoder()));
RedisCommand<Map<StreamId, Map<Object, Object>>> XREADGROUP_BLOCKING = new RedisCommand<Map<StreamId, Map<Object, Object>>>("XREADGROUP",
RedisCommand<Map<StreamMessageId, Map<Object, Object>>> XREADGROUP_BLOCKING = new RedisCommand<Map<StreamMessageId, Map<Object, Object>>>("XREADGROUP",
new ListMultiDecoder(
new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()),
new ObjectDecoder(new StreamIdDecoder()),
@ -379,7 +380,7 @@ public interface RedisCommands {
new ObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX),
new StreamResultDecoder()));
RedisCommand<Map<StreamId, Map<Object, Object>>> XREADGROUP_SINGLE = new RedisCommand<Map<StreamId, Map<Object, Object>>>("XREADGROUP",
RedisCommand<Map<StreamMessageId, Map<Object, Object>>> XREADGROUP_SINGLE = new RedisCommand<Map<StreamMessageId, Map<Object, Object>>>("XREADGROUP",
new ListMultiDecoder(
new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()),
new ObjectDecoder(new StreamIdDecoder()),
@ -389,24 +390,29 @@ public interface RedisCommands {
new ObjectMapReplayDecoder(),
new StreamResultDecoder()));
RedisCommand<Map<String, Map<StreamId, Map<Object, Object>>>> XCLAIM = new RedisCommand<Map<String, Map<StreamId, Map<Object, Object>>>>("XCLAIM",
RedisCommand<List<StreamMessageId>> XCLAIM_IDS = new RedisCommand<List<StreamMessageId>>("XCLAIM", new StreamIdListDecoder());
RedisCommand<Map<StreamMessageId, Map<Object, Object>>> XCLAIM = new RedisCommand<Map<StreamMessageId, Map<Object, Object>>>("XCLAIM",
new ListMultiDecoder(
new ObjectDecoder(new StreamIdDecoder()),
new ObjectMapReplayDecoder(),
new ObjectMapReplayDecoder(ListMultiDecoder.RESET),
new ObjectMapJoinDecoder()));
RedisCommand<Map<StreamId, Map<Object, Object>>> XREADGROUP_BLOCKING_SINGLE = new RedisCommand<Map<StreamId, Map<Object, Object>>>("XREADGROUP",
RedisCommand<Map<StreamMessageId, Map<Object, Object>>> XREADGROUP_BLOCKING_SINGLE = new RedisCommand<Map<StreamMessageId, Map<Object, Object>>>("XREADGROUP",
XREADGROUP_SINGLE.getReplayMultiDecoder());
Set<RedisCommand> BLOCKING_COMMANDS = new HashSet<RedisCommand>(Arrays.asList(
XREAD_BLOCKING_SINGLE, XREAD_BLOCKING, XREADGROUP_BLOCKING_SINGLE, XREADGROUP_BLOCKING));
RedisStrictCommand<StreamId> XADD = new RedisStrictCommand<StreamId>("XADD", new StreamIdConvertor());
RedisStrictCommand<StreamMessageId> XADD = new RedisStrictCommand<StreamMessageId>("XADD", new StreamIdConvertor());
RedisStrictCommand<Void> XGROUP = new RedisStrictCommand<Void>("XGROUP", new VoidReplayConvertor());
RedisStrictCommand<Long> XGROUP_LONG = new RedisStrictCommand<Long>("XGROUP");
RedisStrictCommand<Void> XADD_VOID = new RedisStrictCommand<Void>("XADD", new VoidReplayConvertor());
RedisStrictCommand<Long> XLEN = new RedisStrictCommand<Long>("XLEN");
RedisStrictCommand<Long> XACK = new RedisStrictCommand<Long>("XACK");
RedisStrictCommand<Long> XDEL = new RedisStrictCommand<Long>("XDEL");
RedisStrictCommand<Long> XTRIM = new RedisStrictCommand<Long>("XTRIM");
RedisCommand<Object> XPENDING = new RedisCommand<Object>("XPENDING",
new ListMultiDecoder(new ObjectListReplayDecoder(), new ObjectListReplayDecoder(ListMultiDecoder.RESET), new PendingResultDecoder()));
RedisCommand<Object> XPENDING_ENTRIES = new RedisCommand<Object>("XPENDING",

@ -15,19 +15,19 @@
*/
package org.redisson.client.protocol.convertor;
import org.redisson.api.StreamId;
import org.redisson.api.StreamMessageId;
/**
*
* @author Nikita Koksharov
*
*/
public class StreamIdConvertor extends SingleConvertor<StreamId> {
public class StreamIdConvertor extends SingleConvertor<StreamMessageId> {
@Override
public StreamId convert(Object id) {
public StreamMessageId convert(Object id) {
String[] parts = id.toString().split("-");
return new StreamId(Long.valueOf(parts[0]), Long.valueOf(parts[1]));
return new StreamMessageId(Long.valueOf(parts[0]), Long.valueOf(parts[1]));
}
}

@ -17,7 +17,7 @@ package org.redisson.client.protocol.decoder;
import java.io.IOException;
import org.redisson.api.StreamId;
import org.redisson.api.StreamMessageId;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
@ -35,7 +35,7 @@ public class StreamIdDecoder implements Decoder<Object> {
public Object decode(ByteBuf buf, State state) throws IOException {
String id = (String) StringCodec.INSTANCE.getValueDecoder().decode(buf, state);
String[] parts = id.toString().split("-");
return new StreamId(Long.valueOf(parts[0]), Long.valueOf(parts[1]));
return new StreamMessageId(Long.valueOf(parts[0]), Long.valueOf(parts[1]));
}
}

@ -0,0 +1,50 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import java.util.ArrayList;
import java.util.List;
import org.redisson.api.StreamMessageId;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.convertor.StreamIdConvertor;
/**
*
* @author Nikita Koksharov
*
*/
public class StreamIdListDecoder implements MultiDecoder<List<StreamMessageId>> {
private final StreamIdConvertor convertor = new StreamIdConvertor();
@Override
public Decoder<Object> getDecoder(int paramNum, State state) {
return null;
}
@Override
public List<StreamMessageId> decode(List<Object> parts, State state) {
List<StreamMessageId> ids = new ArrayList<StreamMessageId>();
for (Object id : parts) {
StreamMessageId streamMessageId = convertor.convert(id);
ids.add(streamMessageId);
}
return ids;
}
}

@ -19,7 +19,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.redisson.api.StreamId;
import org.redisson.api.StreamMessageId;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
@ -33,7 +33,7 @@ public class StreamResultDecoder implements MultiDecoder<Object> {
@Override
public Object decode(List<Object> parts, State state) {
if (!parts.isEmpty()) {
Map<String, Map<StreamId, Map<Object, Object>>> result = (Map<String, Map<StreamId, Map<Object, Object>>>) parts.get(0);
Map<String, Map<StreamMessageId, Map<Object, Object>>> result = (Map<String, Map<StreamMessageId, Map<Object, Object>>>) parts.get(0);
return result.values().iterator().next();
}
return Collections.emptyMap();

@ -88,89 +88,96 @@ public class DNSMonitor {
return;
}
final AtomicInteger counter = new AtomicInteger(masters.size() + slaves.size());
for (final Entry<URI, InetSocketAddress> entry : masters.entrySet()) {
Future<InetSocketAddress> resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(entry.getKey().getHost(), entry.getKey().getPort()));
resolveFuture.addListener(new FutureListener<InetSocketAddress>() {
@Override
public void operationComplete(Future<InetSocketAddress> future) throws Exception {
if (counter.decrementAndGet() == 0) {
monitorDnsChange();
}
AtomicInteger counter = new AtomicInteger(masters.size() + slaves.size());
monitorMasters(counter);
monitorSlaves(counter);
}
if (!future.isSuccess()) {
log.error("Unable to resolve " + entry.getKey().getHost(), future.cause());
return;
}
}, dnsMonitoringInterval, TimeUnit.MILLISECONDS);
}
InetSocketAddress currentMasterAddr = entry.getValue();
InetSocketAddress newMasterAddr = future.getNow();
if (!newMasterAddr.getAddress().equals(currentMasterAddr.getAddress())) {
log.info("Detected DNS change. Master {} has changed ip from {} to {}",
entry.getKey(), currentMasterAddr.getAddress().getHostAddress(), newMasterAddr.getAddress().getHostAddress());
MasterSlaveEntry masterSlaveEntry = connectionManager.getEntry(currentMasterAddr);
if (masterSlaveEntry == null) {
log.error("Unable to find master entry for {}", currentMasterAddr);
return;
}
masterSlaveEntry.changeMaster(newMasterAddr, entry.getKey());
masters.put(entry.getKey(), newMasterAddr);
}
private void monitorMasters(final AtomicInteger counter) {
for (final Entry<URI, InetSocketAddress> entry : masters.entrySet()) {
Future<InetSocketAddress> resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(entry.getKey().getHost(), entry.getKey().getPort()));
resolveFuture.addListener(new FutureListener<InetSocketAddress>() {
@Override
public void operationComplete(Future<InetSocketAddress> future) throws Exception {
if (counter.decrementAndGet() == 0) {
monitorDnsChange();
}
if (!future.isSuccess()) {
log.error("Unable to resolve " + entry.getKey().getHost(), future.cause());
return;
}
InetSocketAddress currentMasterAddr = entry.getValue();
InetSocketAddress newMasterAddr = future.getNow();
if (!newMasterAddr.getAddress().equals(currentMasterAddr.getAddress())) {
log.info("Detected DNS change. Master {} has changed ip from {} to {}",
entry.getKey(), currentMasterAddr.getAddress().getHostAddress(), newMasterAddr.getAddress().getHostAddress());
MasterSlaveEntry masterSlaveEntry = connectionManager.getEntry(currentMasterAddr);
if (masterSlaveEntry == null) {
log.error("Unable to find master entry for {}", currentMasterAddr);
return;
}
});
masterSlaveEntry.changeMaster(newMasterAddr, entry.getKey());
masters.put(entry.getKey(), newMasterAddr);
}
}
});
}
}
for (final Entry<URI, InetSocketAddress> entry : slaves.entrySet()) {
Future<InetSocketAddress> resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(entry.getKey().getHost(), entry.getKey().getPort()));
resolveFuture.addListener(new FutureListener<InetSocketAddress>() {
@Override
public void operationComplete(Future<InetSocketAddress> future) throws Exception {
if (counter.decrementAndGet() == 0) {
monitorDnsChange();
}
if (!future.isSuccess()) {
log.error("Unable to resolve " + entry.getKey().getHost(), future.cause());
return;
private void monitorSlaves(final AtomicInteger counter) {
for (final Entry<URI, InetSocketAddress> entry : slaves.entrySet()) {
Future<InetSocketAddress> resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(entry.getKey().getHost(), entry.getKey().getPort()));
resolveFuture.addListener(new FutureListener<InetSocketAddress>() {
@Override
public void operationComplete(Future<InetSocketAddress> future) throws Exception {
if (counter.decrementAndGet() == 0) {
monitorDnsChange();
}
if (!future.isSuccess()) {
log.error("Unable to resolve " + entry.getKey().getHost(), future.cause());
return;
}
final InetSocketAddress currentSlaveAddr = entry.getValue();
final InetSocketAddress newSlaveAddr = future.getNow();
if (!newSlaveAddr.getAddress().equals(currentSlaveAddr.getAddress())) {
log.info("Detected DNS change. Slave {} has changed ip from {} to {}",
entry.getKey().getHost(), currentSlaveAddr.getAddress().getHostAddress(), newSlaveAddr.getAddress().getHostAddress());
for (final MasterSlaveEntry masterSlaveEntry : connectionManager.getEntrySet()) {
if (!masterSlaveEntry.hasSlave(currentSlaveAddr)) {
continue;
}
final InetSocketAddress currentSlaveAddr = entry.getValue();
final InetSocketAddress newSlaveAddr = future.getNow();
if (!newSlaveAddr.getAddress().equals(currentSlaveAddr.getAddress())) {
log.info("Detected DNS change. Slave {} has changed ip from {} to {}",
entry.getKey().getHost(), currentSlaveAddr.getAddress().getHostAddress(), newSlaveAddr.getAddress().getHostAddress());
for (final MasterSlaveEntry masterSlaveEntry : connectionManager.getEntrySet()) {
if (!masterSlaveEntry.hasSlave(currentSlaveAddr)) {
continue;
}
if (masterSlaveEntry.hasSlave(newSlaveAddr)) {
masterSlaveEntry.slaveUp(newSlaveAddr, FreezeReason.MANAGER);
masterSlaveEntry.slaveDown(currentSlaveAddr, FreezeReason.MANAGER);
} else {
RFuture<Void> addFuture = masterSlaveEntry.addSlave(newSlaveAddr, entry.getKey());
addFuture.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
log.error("Can't add slave: " + newSlaveAddr, future.cause());
return;
}
if (masterSlaveEntry.hasSlave(newSlaveAddr)) {
masterSlaveEntry.slaveUp(newSlaveAddr, FreezeReason.MANAGER);
masterSlaveEntry.slaveDown(currentSlaveAddr, FreezeReason.MANAGER);
} else {
RFuture<Void> addFuture = masterSlaveEntry.addSlave(newSlaveAddr, entry.getKey());
addFuture.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
log.error("Can't add slave: " + newSlaveAddr, future.cause());
return;
}
masterSlaveEntry.slaveDown(currentSlaveAddr, FreezeReason.MANAGER);
}
});
}
break;
}
slaves.put(entry.getKey(), newSlaveAddr);
});
}
break;
}
});
slaves.put(entry.getKey(), newSlaveAddr);
}
}
}
}, dnsMonitoringInterval, TimeUnit.MILLISECONDS);
});
}
}

@ -12,10 +12,79 @@ import org.junit.Test;
import org.redisson.api.PendingEntry;
import org.redisson.api.PendingResult;
import org.redisson.api.RStream;
import org.redisson.api.StreamId;
import org.redisson.api.StreamMessageId;
import org.redisson.client.RedisException;
public class RedissonStreamTest extends BaseTest {
@Test
public void testUpdateGroupMessageId() {
RStream<String, String> stream = redisson.getStream("test");
StreamMessageId id = stream.add("0", "0");
stream.createGroup("testGroup");
StreamMessageId id1 = stream.add("1", "1");
System.out.println("id1 " + id1);
StreamMessageId id2 = stream.add("2", "2");
System.out.println("id2 " + id2);
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
assertThat(s.size()).isEqualTo(2);
stream.updateGroupMessageId("testGroup", id);
Map<StreamMessageId, Map<String, String>> s2 = stream.readGroup("testGroup", "consumer2");
assertThat(s2.size()).isEqualTo(2);
}
@Test
public void testRemoveConsumer() {
RStream<String, String> stream = redisson.getStream("test");
stream.add("0", "0");
stream.createGroup("testGroup");
StreamMessageId id1 = stream.add("1", "1");
StreamMessageId id2 = stream.add("2", "2");
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
assertThat(s.size()).isEqualTo(2);
assertThat(stream.removeConsumer("testGroup", "consumer1")).isEqualTo(2);
assertThat(stream.removeConsumer("testGroup", "consumer2")).isZero();
}
@Test(expected = RedisException.class)
public void testRemoveGroup() {
RStream<String, String> stream = redisson.getStream("test");
stream.add("0", "0");
stream.createGroup("testGroup");
StreamMessageId id1 = stream.add("1", "1");
StreamMessageId id2 = stream.add("2", "2");
stream.removeGroup("testGroup");
stream.readGroup("testGroup", "consumer1");
}
@Test
public void testRemoveMessages() {
RStream<String, String> stream = redisson.getStream("test");
StreamMessageId id1 = stream.add("0", "0");
StreamMessageId id2 = stream.add("1", "1");
assertThat(stream.size()).isEqualTo(2);
assertThat(stream.remove(id1, id2)).isEqualTo(2);
assertThat(stream.size()).isZero();
}
@Test
public void testClaim() {
RStream<String, String> stream = redisson.getStream("test");
@ -24,19 +93,19 @@ public class RedissonStreamTest extends BaseTest {
stream.createGroup("testGroup");
StreamId id1 = stream.add("1", "1");
StreamId id2 = stream.add("2", "2");
StreamMessageId id1 = stream.add("1", "1");
StreamMessageId id2 = stream.add("2", "2");
Map<StreamId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
assertThat(s.size()).isEqualTo(2);
StreamId id3 = stream.add("3", "33");
StreamId id4 = stream.add("4", "44");
StreamMessageId id3 = stream.add("3", "33");
StreamMessageId id4 = stream.add("4", "44");
Map<StreamId, Map<String, String>> s2 = stream.readGroup("testGroup", "consumer2");
Map<StreamMessageId, Map<String, String>> s2 = stream.readGroup("testGroup", "consumer2");
assertThat(s2.size()).isEqualTo(2);
Map<StreamId, Map<String, String>> res = stream.claimPending("testGroup", "consumer1", 1, TimeUnit.MILLISECONDS, id3, id4);
Map<StreamMessageId, Map<String, String>> res = stream.claim("testGroup", "consumer1", 1, TimeUnit.MILLISECONDS, id3, id4);
assertThat(res.size()).isEqualTo(2);
assertThat(res.keySet()).containsExactly(id3, id4);
for (Map<String, String> map : res.values()) {
@ -45,6 +114,31 @@ public class RedissonStreamTest extends BaseTest {
}
}
@Test
public void testClaimIds() {
RStream<String, String> stream = redisson.getStream("test");
stream.add("0", "0");
stream.createGroup("testGroup");
StreamMessageId id1 = stream.add("1", "1");
StreamMessageId id2 = stream.add("2", "2");
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
assertThat(s.size()).isEqualTo(2);
StreamMessageId id3 = stream.add("3", "33");
StreamMessageId id4 = stream.add("4", "44");
Map<StreamMessageId, Map<String, String>> s2 = stream.readGroup("testGroup", "consumer2");
assertThat(s2.size()).isEqualTo(2);
List<StreamMessageId> res = stream.fastClaim("testGroup", "consumer1", 1, TimeUnit.MILLISECONDS, id3, id4);
assertThat(res.size()).isEqualTo(2);
assertThat(res).containsExactly(id3, id4);
}
@Test
public void testPending() {
RStream<String, String> stream = redisson.getStream("test");
@ -53,16 +147,16 @@ public class RedissonStreamTest extends BaseTest {
stream.createGroup("testGroup");
StreamId id1 = stream.add("1", "1");
StreamId id2 = stream.add("2", "2");
StreamMessageId id1 = stream.add("1", "1");
StreamMessageId id2 = stream.add("2", "2");
Map<StreamId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
assertThat(s.size()).isEqualTo(2);
StreamId id3 = stream.add("3", "3");
StreamId id4 = stream.add("4", "4");
StreamMessageId id3 = stream.add("3", "3");
StreamMessageId id4 = stream.add("4", "4");
Map<StreamId, Map<String, String>> s2 = stream.readGroup("testGroup", "consumer2");
Map<StreamMessageId, Map<String, String>> s2 = stream.readGroup("testGroup", "consumer2");
assertThat(s2.size()).isEqualTo(2);
PendingResult pi = stream.listPending("testGroup");
@ -71,7 +165,7 @@ public class RedissonStreamTest extends BaseTest {
assertThat(pi.getTotal()).isEqualTo(4);
assertThat(pi.getConsumerNames().keySet()).containsExactly("consumer1", "consumer2");
List<PendingEntry> list = stream.listPending("testGroup", StreamId.MIN, StreamId.MAX, 10);
List<PendingEntry> list = stream.listPending("testGroup", StreamMessageId.MIN, StreamMessageId.MAX, 10);
assertThat(list.size()).isEqualTo(4);
for (PendingEntry pendingEntry : list) {
assertThat(pendingEntry.getId()).isIn(id1, id2, id3, id4);
@ -79,7 +173,7 @@ public class RedissonStreamTest extends BaseTest {
assertThat(pendingEntry.getLastTimeDelivered()).isOne();
}
List<PendingEntry> list2 = stream.listPending("testGroup", StreamId.MIN, StreamId.MAX, 10, "consumer1");
List<PendingEntry> list2 = stream.listPending("testGroup", "consumer1", StreamMessageId.MIN, StreamMessageId.MAX, 10);
assertThat(list2.size()).isEqualTo(2);
for (PendingEntry pendingEntry : list2) {
assertThat(pendingEntry.getId()).isIn(id1, id2);
@ -96,10 +190,10 @@ public class RedissonStreamTest extends BaseTest {
stream.createGroup("testGroup");
StreamId id1 = stream.add("1", "1");
StreamId id2 = stream.add("2", "2");
StreamMessageId id1 = stream.add("1", "1");
StreamMessageId id2 = stream.add("2", "2");
Map<StreamId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
assertThat(s.size()).isEqualTo(2);
assertThat(stream.ack("testGroup", id1, id2)).isEqualTo(2);
@ -109,7 +203,7 @@ public class RedissonStreamTest extends BaseTest {
public void testReadGroup() {
RStream<String, String> stream = redisson.getStream("test");
StreamId id0 = stream.add("0", "0");
StreamMessageId id0 = stream.add("0", "0");
stream.createGroup("testGroup", id0);
@ -117,7 +211,7 @@ public class RedissonStreamTest extends BaseTest {
stream.add("2", "2");
stream.add("3", "3");
Map<StreamId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
assertThat(s.values().iterator().next().keySet()).containsAnyOf("1", "2", "3");
assertThat(s.size()).isEqualTo(3);
@ -125,14 +219,14 @@ public class RedissonStreamTest extends BaseTest {
stream.add("2", "2");
stream.add("3", "3");
Map<StreamId, Map<String, String>> s1 = stream.readGroup("testGroup", "consumer1", 1);
Map<StreamMessageId, Map<String, String>> s1 = stream.readGroup("testGroup", "consumer1", 1);
assertThat(s1.size()).isEqualTo(1);
StreamId id = stream.add("1", "1");
StreamMessageId id = stream.add("1", "1");
stream.add("2", "2");
stream.add("3", "3");
Map<StreamId, Map<String, String>> s2 = stream.readGroup("testGroup", "consumer1", id);
Map<StreamMessageId, Map<String, String>> s2 = stream.readGroup("testGroup", "consumer1", id);
assertThat(s2.size()).isEqualTo(2);
}
@ -144,18 +238,18 @@ public class RedissonStreamTest extends BaseTest {
Map<String, String> entries1 = new HashMap<>();
entries1.put("1", "11");
entries1.put("3", "31");
stream.addAll(new StreamId(1), entries1, 1, false);
stream.addAll(new StreamMessageId(1), entries1, 1, false);
assertThat(stream.size()).isEqualTo(1);
Map<String, String> entries2 = new HashMap<>();
entries2.put("5", "55");
entries2.put("7", "77");
stream.addAll(new StreamId(2), entries2, 1, false);
stream.addAll(new StreamMessageId(2), entries2, 1, false);
Map<StreamId, Map<String, String>> r2 = stream.rangeReversed(10, StreamId.MAX, StreamId.MIN);
assertThat(r2.keySet()).containsExactly(new StreamId(2), new StreamId(1));
assertThat(r2.get(new StreamId(1))).isEqualTo(entries1);
assertThat(r2.get(new StreamId(2))).isEqualTo(entries2);
Map<StreamMessageId, Map<String, String>> r2 = stream.rangeReversed(10, StreamMessageId.MAX, StreamMessageId.MIN);
assertThat(r2.keySet()).containsExactly(new StreamMessageId(2), new StreamMessageId(1));
assertThat(r2.get(new StreamMessageId(1))).isEqualTo(entries1);
assertThat(r2.get(new StreamMessageId(2))).isEqualTo(entries2);
}
@Test
@ -166,22 +260,22 @@ public class RedissonStreamTest extends BaseTest {
Map<String, String> entries1 = new HashMap<>();
entries1.put("1", "11");
entries1.put("3", "31");
stream.addAll(new StreamId(1), entries1, 1, false);
stream.addAll(new StreamMessageId(1), entries1, 1, false);
assertThat(stream.size()).isEqualTo(1);
Map<String, String> entries2 = new HashMap<>();
entries2.put("5", "55");
entries2.put("7", "77");
stream.addAll(new StreamId(2), entries2, 1, false);
stream.addAll(new StreamMessageId(2), entries2, 1, false);
Map<StreamId, Map<String, String>> r = stream.range(10, new StreamId(0), new StreamId(1));
Map<StreamMessageId, Map<String, String>> r = stream.range(10, new StreamMessageId(0), new StreamMessageId(1));
assertThat(r).hasSize(1);
assertThat(r.get(new StreamId(1))).isEqualTo(entries1);
assertThat(r.get(new StreamMessageId(1))).isEqualTo(entries1);
Map<StreamId, Map<String, String>> r2 = stream.range(10, StreamId.MIN, StreamId.MAX);
assertThat(r2.keySet()).containsExactly(new StreamId(1), new StreamId(2));
assertThat(r2.get(new StreamId(1))).isEqualTo(entries1);
assertThat(r2.get(new StreamId(2))).isEqualTo(entries2);
Map<StreamMessageId, Map<String, String>> r2 = stream.range(10, StreamMessageId.MIN, StreamMessageId.MAX);
assertThat(r2.keySet()).containsExactly(new StreamMessageId(1), new StreamMessageId(2));
assertThat(r2.get(new StreamMessageId(1))).isEqualTo(entries1);
assertThat(r2.get(new StreamMessageId(2))).isEqualTo(entries2);
}
@Test
@ -201,16 +295,16 @@ public class RedissonStreamTest extends BaseTest {
e.printStackTrace();
}
stream.addAll(new StreamId(1), entries1);
stream.addAll(new StreamMessageId(1), entries1);
}
};
t.start();
long start = System.currentTimeMillis();
Map<String, Map<StreamId, Map<String, String>>> s = stream.read(2, 5, TimeUnit.SECONDS, new StreamId(0), "test1", StreamId.NEWEST);
Map<String, Map<StreamMessageId, Map<String, String>>> s = stream.read(2, 5, TimeUnit.SECONDS, new StreamMessageId(0), "test1", StreamMessageId.NEWEST);
assertThat(System.currentTimeMillis() - start).isBetween(1900L, 2200L);
assertThat(s).hasSize(1);
assertThat(s.get("test").get(new StreamId(1))).isEqualTo(entries1);
assertThat(s.get("test").get(new StreamMessageId(1))).isEqualTo(entries1);
}
@Test
@ -230,16 +324,16 @@ public class RedissonStreamTest extends BaseTest {
e.printStackTrace();
}
stream.addAll(new StreamId(1), entries1);
stream.addAll(new StreamMessageId(1), entries1);
}
};
t.start();
long start = System.currentTimeMillis();
Map<StreamId, Map<String, String>> s = stream.read(2, 5, TimeUnit.SECONDS, new StreamId(0));
Map<StreamMessageId, Map<String, String>> s = stream.read(2, 5, TimeUnit.SECONDS, new StreamMessageId(0));
assertThat(System.currentTimeMillis() - start).isBetween(1900L, 2200L);
assertThat(s).hasSize(1);
assertThat(s.get(new StreamId(1))).isEqualTo(entries1);
assertThat(s.get(new StreamMessageId(1))).isEqualTo(entries1);
}
@Test
@ -250,20 +344,20 @@ public class RedissonStreamTest extends BaseTest {
Map<String, String> entries1 = new HashMap<>();
entries1.put("1", "11");
entries1.put("3", "31");
stream.addAll(new StreamId(1), entries1, 1, false);
stream.addAll(new StreamMessageId(1), entries1, 1, false);
assertThat(stream.size()).isEqualTo(1);
Map<String, String> entries2 = new HashMap<>();
entries2.put("5", "55");
entries2.put("7", "77");
stream.addAll(new StreamId(2), entries2, 1, false);
stream.addAll(new StreamMessageId(2), entries2, 1, false);
assertThat(stream.size()).isEqualTo(2);
}
@Test
public void testReadMultiKeysEmpty() {
RStream<String, String> stream = redisson.getStream("test2");
Map<String, Map<StreamId, Map<String, String>>> s = stream.read(10, new StreamId(0), "test1", new StreamId(0));
Map<String, Map<StreamMessageId, Map<String, String>>> s = stream.read(10, new StreamMessageId(0), "test1", new StreamMessageId(0));
assertThat(s).isEmpty();
}
@ -282,7 +376,7 @@ public class RedissonStreamTest extends BaseTest {
entries2.put("6", "66");
stream2.addAll(entries2);
Map<String, Map<StreamId, Map<String, String>>> s = stream2.read(10, new StreamId(0), "test1", new StreamId(0));
Map<String, Map<StreamMessageId, Map<String, String>>> s = stream2.read(10, new StreamMessageId(0), "test1", new StreamMessageId(0));
assertThat(s).hasSize(2);
assertThat(s.get("test1").values().iterator().next()).isEqualTo(entries1);
assertThat(s.get("test2").values().iterator().next()).isEqualTo(entries2);
@ -295,24 +389,24 @@ public class RedissonStreamTest extends BaseTest {
Map<String, String> entries1 = new LinkedHashMap<>();
entries1.put("1", "11");
entries1.put("3", "31");
stream.addAll(new StreamId(1), entries1, 1, false);
stream.addAll(new StreamMessageId(1), entries1, 1, false);
Map<String, String> entries2 = new LinkedHashMap<>();
entries2.put("5", "55");
entries2.put("7", "77");
stream.addAll(new StreamId(2), entries2, 1, false);
stream.addAll(new StreamMessageId(2), entries2, 1, false);
Map<String, String> entries3 = new LinkedHashMap<>();
entries3.put("15", "05");
entries3.put("17", "07");
stream.addAll(new StreamId(3), entries3, 1, false);
stream.addAll(new StreamMessageId(3), entries3, 1, false);
Map<StreamId, Map<String, String>> result = stream.read(10, new StreamId(0, 0));
Map<StreamMessageId, Map<String, String>> result = stream.read(10, new StreamMessageId(0, 0));
assertThat(result).hasSize(3);
assertThat(result.get(new StreamId(4))).isNull();
assertThat(result.get(new StreamId(1))).isEqualTo(entries1);
assertThat(result.get(new StreamId(2))).isEqualTo(entries2);
assertThat(result.get(new StreamId(3))).isEqualTo(entries3);
assertThat(result.get(new StreamMessageId(4))).isNull();
assertThat(result.get(new StreamMessageId(1))).isEqualTo(entries1);
assertThat(result.get(new StreamMessageId(2))).isEqualTo(entries2);
assertThat(result.get(new StreamMessageId(3))).isEqualTo(entries3);
}
@Test
@ -321,25 +415,25 @@ public class RedissonStreamTest extends BaseTest {
Map<String, String> entries1 = new LinkedHashMap<>();
entries1.put("1", "11");
entries1.put("3", "31");
stream.addAll(new StreamId(1), entries1, 1, true);
stream.addAll(new StreamMessageId(1), entries1, 1, true);
Map<StreamId, Map<String, String>> result = stream.read(10, new StreamId(0, 0));
Map<StreamMessageId, Map<String, String>> result = stream.read(10, new StreamMessageId(0, 0));
assertThat(result).hasSize(1);
assertThat(result.get(new StreamId(4))).isNull();
assertThat(result.get(new StreamId(1))).isEqualTo(entries1);
assertThat(result.get(new StreamMessageId(4))).isNull();
assertThat(result.get(new StreamMessageId(1))).isEqualTo(entries1);
}
@Test
public void testReadEmpty() {
RStream<String, String> stream2 = redisson.getStream("test");
Map<StreamId, Map<String, String>> result2 = stream2.read(10, new StreamId(0, 0));
Map<StreamMessageId, Map<String, String>> result2 = stream2.read(10, new StreamMessageId(0, 0));
assertThat(result2).isEmpty();
}
@Test
public void testAdd() {
RStream<String, String> stream = redisson.getStream("test1");
StreamId s = stream.add("12", "33");
StreamMessageId s = stream.add("12", "33");
assertThat(s.getId0()).isNotNegative();
assertThat(s.getId1()).isNotNegative();
assertThat(stream.size()).isEqualTo(1);
@ -353,13 +447,13 @@ public class RedissonStreamTest extends BaseTest {
Map<String, String> entries = new HashMap<>();
entries.put("6", "61");
entries.put("4", "41");
stream.addAll(new StreamId(12, 42), entries, 10, false);
stream.addAll(new StreamMessageId(12, 42), entries, 10, false);
assertThat(stream.size()).isEqualTo(1);
entries.clear();
entries.put("1", "11");
entries.put("3", "31");
stream.addAll(new StreamId(Long.MAX_VALUE), entries, 1, false);
stream.addAll(new StreamMessageId(Long.MAX_VALUE), entries, 1, false);
assertThat(stream.size()).isEqualTo(2);
}

Loading…
Cancel
Save