Feature - remove method added to RStream object. #1490

pull/1792/head
Nikita Koksharov 6 years ago
parent 2b2c6b341b
commit 4d5842b0b0

@ -27,7 +27,7 @@ import org.redisson.api.PendingEntry;
import org.redisson.api.PendingResult;
import org.redisson.api.RFuture;
import org.redisson.api.RStream;
import org.redisson.api.StreamId;
import org.redisson.api.StreamMessageId;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
@ -69,25 +69,25 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
@Override
public RFuture<Void> createGroupAsync(String groupName) {
return createGroupAsync(groupName, StreamId.NEWEST);
return createGroupAsync(groupName, StreamMessageId.NEWEST);
}
@Override
public void createGroup(String groupName, StreamId id) {
public void createGroup(String groupName, StreamMessageId id) {
get(createGroupAsync(groupName, id));
}
@Override
public RFuture<Void> createGroupAsync(String groupName, StreamId id) {
public RFuture<Void> createGroupAsync(String groupName, StreamMessageId id) {
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XGROUP, "CREATE", getName(), groupName, id);
}
@Override
public RFuture<Long> ackAsync(String groupName, StreamId... ids) {
public RFuture<Long> ackAsync(String groupName, StreamMessageId... ids) {
List<Object> params = new ArrayList<Object>();
params.add(getName());
params.add(groupName);
for (StreamId id : ids) {
for (StreamMessageId id : ids) {
params.add(id);
}
@ -95,7 +95,7 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
@Override
public Long ack(String groupName, StreamId... id) {
public long ack(String groupName, StreamMessageId... id) {
return get(ackAsync(groupName, id));
}
@ -110,37 +110,37 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
@Override
public RFuture<List<PendingEntry>> listPendingAsync(String groupName, StreamId startId, StreamId endId, int count,
public RFuture<List<PendingEntry>> listPendingAsync(String groupName, StreamMessageId startId, StreamMessageId endId, int count,
String consumerName) {
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.XPENDING_ENTRIES, getName(), groupName, startId, endId, count, consumerName);
}
@Override
public RFuture<List<PendingEntry>> listPendingAsync(String groupName, StreamId startId, StreamId endId, int count) {
public RFuture<List<PendingEntry>> listPendingAsync(String groupName, StreamMessageId startId, StreamMessageId endId, int count) {
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.XPENDING_ENTRIES, getName(), groupName, startId, endId, count);
}
@Override
public List<PendingEntry> listPending(String groupName, StreamId startId, StreamId endId, int count) {
public List<PendingEntry> listPending(String groupName, StreamMessageId startId, StreamMessageId endId, int count) {
return get(listPendingAsync(groupName, startId, endId, count));
}
@Override
public List<PendingEntry> listPending(String groupName, StreamId startId, StreamId endId, int count,
public List<PendingEntry> listPending(String groupName, StreamMessageId startId, StreamMessageId endId, int count,
String consumerName) {
return get(listPendingAsync(groupName, startId, endId, count, consumerName));
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> claimAsync(String groupName, String consumerName, long idleTime,
TimeUnit idleTimeUnit, StreamId... ids) {
public RFuture<Map<StreamMessageId, Map<K, V>>> claimAsync(String groupName, String consumerName, long idleTime,
TimeUnit idleTimeUnit, StreamMessageId... ids) {
List<Object> params = new ArrayList<Object>();
params.add(getName());
params.add(groupName);
params.add(consumerName);
params.add(idleTimeUnit.toMillis(idleTime));
for (StreamId id : ids) {
for (StreamMessageId id : ids) {
params.add(id.toString());
}
@ -148,30 +148,30 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
@Override
public Map<StreamId, Map<K, V>> claimPending(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit,
StreamId... ids) {
public Map<StreamMessageId, Map<K, V>> claimPending(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit,
StreamMessageId... ids) {
return get(claimAsync(groupName, consumerName, idleTime, idleTimeUnit, ids));
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, StreamId... ids) {
public RFuture<Map<StreamMessageId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, StreamMessageId... ids) {
return readGroupAsync(groupName, consumerName, 0, ids);
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, int count, StreamId... ids) {
public RFuture<Map<StreamMessageId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, int count, StreamMessageId... ids) {
return readGroupAsync(groupName, consumerName, count, 0, null, ids);
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit,
StreamId... ids) {
public RFuture<Map<StreamMessageId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit,
StreamMessageId... ids) {
return readGroupAsync(groupName, consumerName, 0, timeout, unit, ids);
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit,
StreamId... ids) {
public RFuture<Map<StreamMessageId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit,
StreamMessageId... ids) {
List<Object> params = new ArrayList<Object>();
params.add("GROUP");
params.add(groupName);
@ -194,7 +194,7 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
params.add(">");
}
for (StreamId id : ids) {
for (StreamMessageId id : ids) {
params.add(id.toString());
}
@ -205,62 +205,62 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
@Override
public Map<StreamId, Map<K, V>> readGroup(String groupName, String consumerName, StreamId... ids) {
public Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, StreamMessageId... ids) {
return get(readGroupAsync(groupName, consumerName, ids));
}
@Override
public Map<StreamId, Map<K, V>> readGroup(String groupName, String consumerName, int count, StreamId... ids) {
public Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, int count, StreamMessageId... ids) {
return get(readGroupAsync(groupName, consumerName, count, ids));
}
@Override
public Map<StreamId, Map<K, V>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamId... ids) {
public Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId... ids) {
return get(readGroupAsync(groupName, consumerName, timeout, unit, ids));
}
@Override
public Map<StreamId, Map<K, V>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit,
StreamId... ids) {
public Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit,
StreamMessageId... ids) {
return get(readGroupAsync(groupName, consumerName, count, timeout, unit, ids));
}
@Override
public StreamId addAll(Map<K, V> entries) {
public StreamMessageId addAll(Map<K, V> entries) {
return addAll(entries, 0, false);
}
@Override
public RFuture<StreamId> addAllAsync(Map<K, V> entries) {
public RFuture<StreamMessageId> addAllAsync(Map<K, V> entries) {
return addAllAsync(entries, 0, false);
}
@Override
public void addAll(StreamId id, Map<K, V> entries) {
public void addAll(StreamMessageId id, Map<K, V> entries) {
addAll(id, entries, 0, false);
}
@Override
public RFuture<Void> addAllAsync(StreamId id, Map<K, V> entries) {
public RFuture<Void> addAllAsync(StreamMessageId id, Map<K, V> entries) {
return addAllAsync(id, entries, 0, false);
}
@Override
public StreamId addAll(Map<K, V> entries, int trimLen, boolean trimStrict) {
public StreamMessageId addAll(Map<K, V> entries, int trimLen, boolean trimStrict) {
return get(addAllAsync(entries, trimLen, trimStrict));
}
@Override
public RFuture<StreamId> addAllAsync(Map<K, V> entries, int trimLen, boolean trimStrict) {
public RFuture<StreamMessageId> addAllAsync(Map<K, V> entries, int trimLen, boolean trimStrict) {
return addAllCustomAsync(null, entries, trimLen, trimStrict);
}
@Override
public void addAll(StreamId id, Map<K, V> entries, int trimLen, boolean trimStrict) {
public void addAll(StreamMessageId id, Map<K, V> entries, int trimLen, boolean trimStrict) {
get(addAllAsync(id, entries, trimLen, trimStrict));
}
private <R> RFuture<R> addAllCustomAsync(StreamId id, Map<K, V> entries, int trimLen, boolean trimStrict) {
private <R> RFuture<R> addAllCustomAsync(StreamMessageId id, Map<K, V> entries, int trimLen, boolean trimStrict) {
List<Object> params = new ArrayList<Object>(entries.size()*2 + 1);
params.add(getName());
@ -293,7 +293,7 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
@Override
public RFuture<Void> addAllAsync(StreamId id, Map<K, V> entries, int trimLen, boolean trimStrict) {
public RFuture<Void> addAllAsync(StreamMessageId id, Map<K, V> entries, int trimLen, boolean trimStrict) {
return addAllCustomAsync(id, entries, trimLen, trimStrict);
}
@ -308,22 +308,22 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
@Override
public Map<StreamId, Map<K, V>> read(int count, StreamId ... ids) {
public Map<StreamMessageId, Map<K, V>> read(int count, StreamMessageId ... ids) {
return get(readAsync(count, ids));
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, StreamId id, Map<String, StreamId> keyToId) {
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
return readAsync(count, -1, null, id, keyToId);
}
@Override
public Map<StreamId, Map<K, V>> read(int count, long timeout, TimeUnit unit, StreamId... ids) {
public Map<StreamMessageId, Map<K, V>> read(int count, long timeout, TimeUnit unit, StreamMessageId... ids) {
return get(readAsync(count, timeout, unit, ids));
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> keyToId) {
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
List<Object> params = new ArrayList<Object>();
if (count > 0) {
params.add("COUNT");
@ -342,7 +342,7 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
params.add(id);
for (StreamId nextId : keyToId.values()) {
for (StreamMessageId nextId : keyToId.values()) {
params.add(nextId.toString());
}
@ -353,21 +353,21 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
@Override
public RFuture<StreamId> addAsync(K key, V value) {
public RFuture<StreamMessageId> addAsync(K key, V value) {
return addAsync(key, value, 0, false);
}
@Override
public RFuture<Void> addAsync(StreamId id, K key, V value) {
public RFuture<Void> addAsync(StreamMessageId id, K key, V value) {
return addAsync(id, key, value, 0, false);
}
@Override
public RFuture<StreamId> addAsync(K key, V value, int trimLen, boolean trimStrict) {
public RFuture<StreamMessageId> addAsync(K key, V value, int trimLen, boolean trimStrict) {
return addCustomAsync(null, key, value, trimLen, trimStrict);
}
private <R> RFuture<R> addCustomAsync(StreamId id, K key, V value, int trimLen, boolean trimStrict) {
private <R> RFuture<R> addCustomAsync(StreamMessageId id, K key, V value, int trimLen, boolean trimStrict) {
List<Object> params = new LinkedList<Object>();
params.add(getName());
@ -398,38 +398,38 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
@Override
public RFuture<Void> addAsync(StreamId id, K key, V value, int trimLen, boolean trimStrict) {
public RFuture<Void> addAsync(StreamMessageId id, K key, V value, int trimLen, boolean trimStrict) {
return addCustomAsync(id, key, value, trimLen, trimStrict);
}
@Override
public StreamId add(K key, V value) {
public StreamMessageId add(K key, V value) {
return get(addAsync(key, value));
}
@Override
public void add(StreamId id, K key, V value) {
public void add(StreamMessageId id, K key, V value) {
get(addAsync(id, key, value));
}
@Override
public StreamId add(K key, V value, int trimLen, boolean trimStrict) {
public StreamMessageId add(K key, V value, int trimLen, boolean trimStrict) {
return get(addAsync(key, value, trimLen, trimStrict));
}
@Override
public void add(StreamId id, K key, V value, int trimLen, boolean trimStrict) {
public void add(StreamMessageId id, K key, V value, int trimLen, boolean trimStrict) {
get(addAsync(id, key, value, trimLen, trimStrict));
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> readAsync(int count, StreamId... ids) {
public RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(int count, StreamMessageId... ids) {
return readAsync(count, 0, null, ids);
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> readAsync(int count, long timeout, TimeUnit unit,
StreamId... ids) {
public RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(int count, long timeout, TimeUnit unit,
StreamMessageId... ids) {
List<Object> params = new ArrayList<Object>();
if (count > 0) {
params.add("COUNT");
@ -444,7 +444,7 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
params.add("STREAMS");
params.add(getName());
for (StreamId id : ids) {
for (StreamMessageId id : ids) {
params.add(id.toString());
}
@ -455,17 +455,17 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(int count, StreamId id, Map<String, StreamId> keyToId) {
public Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
return get(readAsync(count, id, keyToId));
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> keyToId) {
public Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
return get(readAsync(count, timeout, unit, id, keyToId));
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> rangeAsync(int count, StreamId startId, StreamId endId) {
public RFuture<Map<StreamMessageId, Map<K, V>>> rangeAsync(int count, StreamMessageId startId, StreamMessageId endId) {
List<Object> params = new LinkedList<Object>();
params.add(getName());
params.add(startId);
@ -480,12 +480,12 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
@Override
public Map<StreamId, Map<K, V>> range(int count, StreamId startId, StreamId endId) {
public Map<StreamMessageId, Map<K, V>> range(int count, StreamMessageId startId, StreamMessageId endId) {
return get(rangeAsync(count, startId, endId));
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> rangeReversedAsync(int count, StreamId startId, StreamId endId) {
public RFuture<Map<StreamMessageId, Map<K, V>>> rangeReversedAsync(int count, StreamMessageId startId, StreamMessageId endId) {
List<Object> params = new LinkedList<Object>();
params.add(getName());
params.add(startId);
@ -500,172 +500,188 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
@Override
public Map<StreamId, Map<K, V>> rangeReversed(int count, StreamId startId, StreamId endId) {
public Map<StreamMessageId, Map<K, V>> rangeReversed(int count, StreamMessageId startId, StreamMessageId endId) {
return get(rangeReversedAsync(count, startId, endId));
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> readAsync(StreamId... ids) {
public RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(StreamMessageId... ids) {
return readAsync(0, ids);
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> readAsync(long timeout, TimeUnit unit, StreamId... ids) {
public RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(long timeout, TimeUnit unit, StreamMessageId... ids) {
return readAsync(0, timeout, unit, ids);
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(StreamId id, Map<String, StreamId> keyToId) {
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(StreamMessageId id, Map<String, StreamMessageId> keyToId) {
return readAsync(0, id, keyToId);
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> keyToId) {
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
return readAsync(0, timeout, unit, id, keyToId);
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> rangeAsync(StreamId startId, StreamId endId) {
public RFuture<Map<StreamMessageId, Map<K, V>>> rangeAsync(StreamMessageId startId, StreamMessageId endId) {
return rangeAsync(0, startId, endId);
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> rangeReversedAsync(StreamId startId, StreamId endId) {
public RFuture<Map<StreamMessageId, Map<K, V>>> rangeReversedAsync(StreamMessageId startId, StreamMessageId endId) {
return rangeReversedAsync(0, startId, endId);
}
@Override
public Map<StreamId, Map<K, V>> read(StreamId... ids) {
public Map<StreamMessageId, Map<K, V>> read(StreamMessageId... ids) {
return read(0, ids);
}
@Override
public Map<StreamId, Map<K, V>> read(long timeout, TimeUnit unit, StreamId... ids) {
public Map<StreamMessageId, Map<K, V>> read(long timeout, TimeUnit unit, StreamMessageId... ids) {
return read(0, timeout, unit, ids);
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(StreamId id, Map<String, StreamId> keyToId) {
public Map<String, Map<StreamMessageId, Map<K, V>>> read(StreamMessageId id, Map<String, StreamMessageId> keyToId) {
return read(0, id, keyToId);
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> keyToId) {
public Map<String, Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
return read(0, timeout, unit, id, keyToId);
}
@Override
public Map<StreamId, Map<K, V>> range(StreamId startId, StreamId endId) {
public Map<StreamMessageId, Map<K, V>> range(StreamMessageId startId, StreamMessageId endId) {
return range(0, startId, endId);
}
@Override
public Map<StreamId, Map<K, V>> rangeReversed(StreamId startId, StreamId endId) {
public Map<StreamMessageId, Map<K, V>> rangeReversed(StreamMessageId startId, StreamMessageId endId) {
return rangeReversed(0, startId, endId);
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(StreamId id, String key2, StreamId id2) {
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(StreamMessageId id, String key2, StreamMessageId id2) {
return readAsync(id, Collections.singletonMap(key2, id2));
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(StreamId id, String key2, StreamId id2, String key3,
StreamId id3) {
Map<String, StreamId> params = new HashMap<String, StreamId>(2);
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(StreamMessageId id, String key2, StreamMessageId id2, String key3,
StreamMessageId id3) {
Map<String, StreamMessageId> params = new HashMap<String, StreamMessageId>(2);
params.put(key2, id2);
params.put(key3, id3);
return readAsync(id, params);
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, StreamId id, String key2, StreamId id2) {
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, StreamMessageId id, String key2, StreamMessageId id2) {
return readAsync(count, id, Collections.singletonMap(key2, id2));
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, StreamId id, String key2, StreamId id2,
String key3, StreamId id3) {
Map<String, StreamId> params = new HashMap<String, StreamId>(2);
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, StreamMessageId id, String key2, StreamMessageId id2,
String key3, StreamMessageId id3) {
Map<String, StreamMessageId> params = new HashMap<String, StreamMessageId>(2);
params.put(key2, id2);
params.put(key3, id3);
return readAsync(count, id, params);
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamId id,
String key2, StreamId id2) {
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id,
String key2, StreamMessageId id2) {
return readAsync(timeout, unit, id, Collections.singletonMap(key2, id2));
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamId id,
String key2, StreamId id2, String key3, StreamId id3) {
Map<String, StreamId> params = new HashMap<String, StreamId>(2);
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id,
String key2, StreamMessageId id2, String key3, StreamMessageId id3) {
Map<String, StreamMessageId> params = new HashMap<String, StreamMessageId>(2);
params.put(key2, id2);
params.put(key3, id3);
return readAsync(timeout, unit, id, params);
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamId id,
String key2, StreamId id2) {
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id,
String key2, StreamMessageId id2) {
return readAsync(count, timeout, unit, id, Collections.singletonMap(key2, id2));
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamId id,
String key2, StreamId id2, String key3, StreamId id3) {
Map<String, StreamId> params = new HashMap<String, StreamId>(2);
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id,
String key2, StreamMessageId id2, String key3, StreamMessageId id3) {
Map<String, StreamMessageId> params = new HashMap<String, StreamMessageId>(2);
params.put(key2, id2);
params.put(key3, id3);
return readAsync(count, timeout, unit, id, params);
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(StreamId id, String key2, StreamId id2) {
public Map<String, Map<StreamMessageId, Map<K, V>>> read(StreamMessageId id, String key2, StreamMessageId id2) {
return get(readAsync(id, key2, id2));
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(StreamId id, String key2, StreamId id2, String key3,
StreamId id3) {
public Map<String, Map<StreamMessageId, Map<K, V>>> read(StreamMessageId id, String key2, StreamMessageId id2, String key3,
StreamMessageId id3) {
return get(readAsync(id, key2, id2, key3, id3));
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(int count, StreamId id, String key2, StreamId id2) {
public Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId id, String key2, StreamMessageId id2) {
return get(readAsync(count, id, key2, id2));
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(int count, StreamId id, String key2, StreamId id2, String key3,
StreamId id3) {
public Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId id, String key2, StreamMessageId id2, String key3,
StreamMessageId id3) {
return get(readAsync(count, id, key2, id2, key3, id3));
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamId id, String key2,
StreamId id2) {
public Map<String, Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId id, String key2,
StreamMessageId id2) {
return get(readAsync(timeout, unit, id, key2, id2));
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamId id, String key2,
StreamId id2, String key3, StreamId id3) {
public Map<String, Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId id, String key2,
StreamMessageId id2, String key3, StreamMessageId id3) {
return get(readAsync(timeout, unit, id, key2, id2, key3, id3));
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamId id, String key2,
StreamId id2) {
public Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String key2,
StreamMessageId id2) {
return get(readAsync(count, timeout, unit, id, key2, id2));
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamId id, String key2,
StreamId id2, String key3, StreamId id3) {
public Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String key2,
StreamMessageId id2, String key3, StreamMessageId id3) {
return get(readAsync(count, timeout, unit, id, key2, id2, key3, id3));
}
@Override
public RFuture<Long> removeAsync(StreamMessageId... ids) {
List<Object> params = new ArrayList<Object>();
params.add(getName());
for (StreamMessageId id : ids) {
params.add(id);
}
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XDEL, params.toArray());
}
@Override
public long remove(StreamMessageId... ids) {
return get(removeAsync(ids));
}
}

@ -23,12 +23,12 @@ package org.redisson.api;
*/
public class PendingEntry {
private StreamId id;
private StreamMessageId id;
private String consumerName;
private long idleTime;
private long lastTimeDelivered;
public PendingEntry(StreamId id, String consumerName, long idleTime, long lastTimeDelivered) {
public PendingEntry(StreamMessageId id, String consumerName, long idleTime, long lastTimeDelivered) {
super();
this.id = id;
this.consumerName = consumerName;
@ -41,7 +41,7 @@ public class PendingEntry {
*
* @return id
*/
public StreamId getId() {
public StreamMessageId getId() {
return id;
}

@ -29,14 +29,14 @@ public class PendingResult implements Serializable {
private static final long serialVersionUID = -5525031552305408248L;
private long total;
private StreamId lowestId;
private StreamId highestId;
private StreamMessageId lowestId;
private StreamMessageId highestId;
private Map<String, Long> consumerNames;
public PendingResult() {
}
public PendingResult(long total, StreamId lowestId, StreamId highestId, Map<String, Long> consumerNames) {
public PendingResult(long total, StreamMessageId lowestId, StreamMessageId highestId, Map<String, Long> consumerNames) {
super();
this.total = total;
this.lowestId = lowestId;
@ -58,7 +58,7 @@ public class PendingResult implements Serializable {
*
* @return number
*/
public StreamId getLowestId() {
public StreamMessageId getLowestId() {
return lowestId;
}
@ -67,7 +67,7 @@ public class PendingResult implements Serializable {
*
* @return number
*/
public StreamId getHighestId() {
public StreamMessageId getHighestId() {
return highestId;
}

@ -39,24 +39,24 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
void createGroup(String groupName);
/**
* Creates consumer group by name and stream id.
* Creates consumer group by name and Stream Message ID.
* Only new messages after defined stream <code>id</code> will be available for consumers of this group.
* <p>
* {@link StreamId#NEWEST} is used for messages arrived since the moment of group creating
* {@link StreamMessageId#NEWEST} is used for messages arrived since the moment of group creating
*
* @param groupName - name of group
* @param id - stream id
* @param id - Stream Message ID
*/
void createGroup(String groupName, StreamId id);
void createGroup(String groupName, StreamMessageId id);
/**
* Marks pending messages by group name and stream <code>ids</code> as correctly processed.
*
* @param groupName - name of group
* @param ids - stream ids
* @param ids - Stream Message IDs
* @return marked messages amount
*/
Long ack(String groupName, StreamId... ids);
long ack(String groupName, StreamMessageId... ids);
/**
* Returns pending messages by group name
@ -68,34 +68,34 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
/**
* Returns list of pending messages by group name.
* Limited by start stream id and end stream id and count.
* Limited by start Stream Message ID and end Stream Message ID and count.
* <p>
* {@link StreamId#MAX} is used as max stream id
* {@link StreamId#MIN} is used as min stream id
* {@link StreamMessageId#MAX} is used as max Stream Message ID
* {@link StreamMessageId#MIN} is used as min Stream Message ID
*
* @param groupName - name of group
* @param startId - start stream id
* @param endId - end stream id
* @param startId - start Stream Message ID
* @param endId - end Stream Message ID
* @param count - amount of messages
* @return list
*/
List<PendingEntry> listPending(String groupName, StreamId startId, StreamId endId, int count);
List<PendingEntry> listPending(String groupName, StreamMessageId startId, StreamMessageId endId, int count);
/**
* Returns list of pending messages by group name and consumer name.
* Limited by start stream id and end stream id and count.
* Limited by start Stream Message ID and end Stream Message ID and count.
* <p>
* {@link StreamId#MAX} is used as max stream id
* {@link StreamId#MIN} is used as min stream id
* {@link StreamMessageId#MAX} is used as max Stream Message ID
* {@link StreamMessageId#MIN} is used as min Stream Message ID
*
* @param consumerName - name of consumer
* @param groupName - name of group
* @param startId - start stream id
* @param endId - end stream id
* @param startId - start Stream Message ID
* @param endId - end Stream Message ID
* @param count - amount of messages
* @return list
*/
List<PendingEntry> listPending(String groupName, StreamId startId, StreamId endId, int count, String consumerName);
List<PendingEntry> listPending(String groupName, StreamMessageId startId, StreamMessageId endId, int count, String consumerName);
/**
* Transfers ownership of pending messages by id to a new consumer
@ -105,47 +105,47 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* @param consumerName - name of consumer
* @param idleTime - minimum idle time of messages
* @param idleTimeUnit - idle time unit
* @param ids - stream ids
* @return stream data mapped by Stream ID
* @param ids - Stream Message IDs
* @return stream data mapped by Stream Message ID
*/
Map<StreamId, Map<K, V>> claimPending(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamId ... ids);
Map<StreamMessageId, Map<K, V>> claimPending(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream Message IDs.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
* @param ids - collection of Stream Message IDs
* @return stream data mapped by Stream Message ID
*/
Map<StreamId, Map<K, V>> readGroup(String groupName, String consumerName, StreamId ... ids);
Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, StreamMessageId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream Message IDs.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
* @param ids - collection of Stream Message IDs
* @return stream data mapped by Stream Message ID
*/
Map<StreamId, Map<K, V>> readGroup(String groupName, String consumerName, int count, StreamId ... ids);
Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, int count, StreamMessageId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream Message IDs.
* Wait for stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
* @param ids - collection of Stream Message IDs
* @return stream data mapped by Stream Message ID
*/
Map<StreamId, Map<K, V>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamId ... ids);
Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream Message IDs.
* Wait for stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
@ -153,10 +153,10 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
* @param ids - collection of Stream Message IDs
* @return stream data mapped by Stream Message ID
*/
Map<StreamId, Map<K, V>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamId ... ids);
Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId ... ids);
/**
* Returns number of entries in stream
@ -166,25 +166,25 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
long size();
/**
* Appends a new entry and returns generated Stream ID
* Appends a new entry and returns generated Stream Message ID
*
* @param key - key of entry
* @param value - value of entry
* @return Stream ID
* @return Stream Message ID
*/
StreamId add(K key, V value);
StreamMessageId add(K key, V value);
/**
* Appends a new entry by specified Stream ID
* Appends a new entry by specified Stream Message ID
*
* @param id - Stream ID
* @param id - Stream Message ID
* @param key - key of entry
* @param value - value of entry
*/
void add(StreamId id, K key, V value);
void add(StreamMessageId id, K key, V value);
/**
* Appends a new entry and returns generated Stream ID.
* Appends a new entry and returns generated Stream Message ID.
* Trims stream to a specified <code>trimLen</code> size.
* If <code>trimStrict</code> is <code>false</code> then trims to few tens of entries more than specified length to trim.
*
@ -192,102 +192,102 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* @param value - value of entry
* @param trimLen - length to trim
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return Stream ID
* @return Stream Message ID
*/
StreamId add(K key, V value, int trimLen, boolean trimStrict);
StreamMessageId add(K key, V value, int trimLen, boolean trimStrict);
/**
* Appends a new entry by specified Stream ID.
* Appends a new entry by specified Stream Message ID.
* Trims stream to a specified <code>trimLen</code> size.
* If <code>trimStrict</code> is <code>false</code> then trims to few tens of entries more than specified length to trim.
*
* @param id - Stream ID
* @param id - Stream Message ID
* @param key - key of entry
* @param value - value of entry
* @param trimLen - length to trim
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
*/
void add(StreamId id, K key, V value, int trimLen, boolean trimStrict);
void add(StreamMessageId id, K key, V value, int trimLen, boolean trimStrict);
/**
* Appends new entries and returns generated Stream ID
* Appends new entries and returns generated Stream Message ID
*
* @param entries - entries to add
* @return Stream ID
* @return Stream Message ID
*/
StreamId addAll(Map<K, V> entries);
StreamMessageId addAll(Map<K, V> entries);
/**
* Appends new entries by specified Stream ID
* Appends new entries by specified Stream Message ID
*
* @param id - Stream ID
* @param id - Stream Message ID
* @param entries - entries to add
*/
void addAll(StreamId id, Map<K, V> entries);
void addAll(StreamMessageId id, Map<K, V> entries);
/**
* Appends new entries and returns generated Stream ID.
* Appends new entries and returns generated Stream Message ID.
* Trims stream to a specified <code>trimLen</code> size.
* If <code>trimStrict</code> is <code>false</code> then trims to few tens of entries more than specified length to trim.
*
* @param entries - entries to add
* @param trimLen - length to trim
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return Stream ID
* @return Stream Message ID
*/
StreamId addAll(Map<K, V> entries, int trimLen, boolean trimStrict);
StreamMessageId addAll(Map<K, V> entries, int trimLen, boolean trimStrict);
/**
* Appends new entries by specified Stream ID.
* Appends new entries by specified Stream Message ID.
* Trims stream to a specified <code>trimLen</code> size.
* If <code>trimStrict</code> is <code>false</code> then trims to few tens of entries more than specified length to trim.
*
* @param id - Stream ID
* @param id - Stream Message ID
* @param entries - entries to add
* @param trimLen - length to trim
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
*/
void addAll(StreamId id, Map<K, V> entries, int trimLen, boolean trimStrict);
void addAll(StreamMessageId id, Map<K, V> entries, int trimLen, boolean trimStrict);
/**
* Read stream data by specified collection of Stream IDs.
* Read stream data by specified collection of Stream Message IDs.
*
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
* @param ids - collection of Stream Message IDs
* @return stream data mapped by Stream Message ID
*/
Map<StreamId, Map<K, V>> read(StreamId ... ids);
Map<StreamMessageId, Map<K, V>> read(StreamMessageId ... ids);
/**
* Read stream data by specified collection of Stream IDs.
* Read stream data by specified collection of Stream Message IDs.
*
* @param count - stream data size limit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
* @param ids - collection of Stream Message IDs
* @return stream data mapped by Stream Message ID
*/
Map<StreamId, Map<K, V>> read(int count, StreamId ... ids);
Map<StreamMessageId, Map<K, V>> read(int count, StreamMessageId ... ids);
/**
* Read stream data by specified collection of Stream IDs.
* Read stream data by specified collection of Stream Message IDs.
* Wait for stream data availability for specified <code>timeout</code> interval.
*
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
* @param ids - collection of Stream Message IDs
* @return stream data mapped by Stream Message ID
*/
Map<StreamId, Map<K, V>> read(long timeout, TimeUnit unit, StreamId ... ids);
Map<StreamMessageId, Map<K, V>> read(long timeout, TimeUnit unit, StreamMessageId ... ids);
/**
* Read stream data by specified collection of Stream IDs.
* Read stream data by specified collection of Stream Message IDs.
* Wait for stream data availability for specified <code>timeout</code> interval.
*
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
* @param ids - collection of Stream Message IDs
* @return stream data mapped by Stream Message ID
*/
Map<StreamId, Map<K, V>> read(int count, long timeout, TimeUnit unit, StreamId ... ids);
Map<StreamMessageId, Map<K, V>> read(int count, long timeout, TimeUnit unit, StreamMessageId ... ids);
/**
* Read stream data by specified stream name including this stream.
@ -295,9 +295,9 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
* @return stream data mapped by key and Stream Message ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(StreamId id, String name2, StreamId id2);
Map<String, Map<StreamMessageId, Map<K, V>>> read(StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
@ -307,18 +307,18 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* @param id2 - id of second stream
* @param name3 - name of third stream
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
* @return stream data mapped by key and Stream Message ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(StreamId id, String name2, StreamId id2, String name3, StreamId id3);
Map<String, Map<StreamMessageId, Map<K, V>>> read(StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
* Read stream data by specified Stream Message ID mapped by name including this stream.
*
* @param id - id of this stream
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
* @param nameToId - Stream Message ID mapped by name
* @return stream data mapped by key and Stream Message ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(StreamId id, Map<String, StreamId> nameToId);
Map<String, Map<StreamMessageId, Map<K, V>>> read(StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data by specified stream name including this stream.
@ -327,9 +327,9 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
* @return stream data mapped by key and Stream Message ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(int count, StreamId id, String name2, StreamId id2);
Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
@ -340,19 +340,19 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* @param id2 - id of second stream
* @param name3 - name of third stream
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
* @return stream data mapped by key and Stream Message ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(int count, StreamId id, String name2, StreamId id2, String name3, StreamId id3);
Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
* Read stream data by specified Stream Message ID mapped by name including this stream.
*
* @param count - stream data size limit
* @param id - id of this stream
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
* @param nameToId - Stream Message ID mapped by name
* @return stream data mapped by key and Stream Message ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(int count, StreamId id, Map<String, StreamId> nameToId);
Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data by specified stream name including this stream.
@ -363,9 +363,9 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
* @return stream data mapped by key and Stream Message ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2);
Map<String, Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
@ -378,21 +378,21 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* @param id2 - id of second stream
* @param name3 - name of third stream
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
* @return stream data mapped by key and Stream Message ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2, String name3, StreamId id3);
Map<String, Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
* Read stream data by specified Stream Message ID mapped by name including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
* @param nameToId - Stream Message ID mapped by name
* @return stream data mapped by key and Stream Message ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> nameToId);
Map<String, Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data by specified stream name including this stream.
@ -404,9 +404,9 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
* @return stream data mapped by key and Stream Message ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2);
Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
@ -420,59 +420,67 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* @param id2 - id of second stream
* @param name3 - name of third stream
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
* @return stream data mapped by key and Stream Message ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2, String name3, StreamId id3);
Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
* Read stream data by specified Stream Message ID mapped by name including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
* @param nameToId - Stream Message ID mapped by name
* @return stream data mapped by key and Stream Message ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> nameToId);
Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data in range by specified start Stream ID (included) and end Stream ID (included).
* Read stream data in range by specified start Stream Message ID (included) and end Stream Message ID (included).
*
* @param startId - start Stream ID
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
* @param startId - start Stream Message ID
* @param endId - end Stream Message ID
* @return stream data mapped by Stream Message ID
*/
Map<StreamId, Map<K, V>> range(StreamId startId, StreamId endId);
Map<StreamMessageId, Map<K, V>> range(StreamMessageId startId, StreamMessageId endId);
/**
* Read stream data in range by specified start Stream ID (included) and end Stream ID (included).
* Read stream data in range by specified start Stream Message ID (included) and end Stream Message ID (included).
*
* @param count - stream data size limit
* @param startId - start Stream ID
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
* @param startId - start Stream Message ID
* @param endId - end Stream Message ID
* @return stream data mapped by Stream Message ID
*/
Map<StreamId, Map<K, V>> range(int count, StreamId startId, StreamId endId);
Map<StreamMessageId, Map<K, V>> range(int count, StreamMessageId startId, StreamMessageId endId);
/**
* Read stream data in reverse order in range by specified start Stream ID (included) and end Stream ID (included).
* Read stream data in reverse order in range by specified start Stream Message ID (included) and end Stream Message ID (included).
*
* @param startId - start Stream ID
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
* @param startId - start Stream Message ID
* @param endId - end Stream Message ID
* @return stream data mapped by Stream Message ID
*/
Map<StreamId, Map<K, V>> rangeReversed(StreamId startId, StreamId endId);
Map<StreamMessageId, Map<K, V>> rangeReversed(StreamMessageId startId, StreamMessageId endId);
/**
* Read stream data in reverse order in range by specified start Stream ID (included) and end Stream ID (included).
* Read stream data in reverse order in range by specified start Stream Message ID (included) and end Stream Message ID (included).
*
* @param count - stream data size limit
* @param startId - start Stream ID
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
* @param startId - start Stream Message ID
* @param endId - end Stream Message ID
* @return stream data mapped by Stream Message ID
*/
Map<StreamId, Map<K, V>> rangeReversed(int count, StreamId startId, StreamId endId);
Map<StreamMessageId, Map<K, V>> rangeReversed(int count, StreamMessageId startId, StreamMessageId endId);
/**
* Removes messages by id.
*
* @param ids - id of messages to remove
* @return deleted messages amount
*/
long remove(StreamMessageId... ids);
}

@ -43,13 +43,13 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* Creates consumer group by name and stream id.
* Only new messages after defined stream <code>id</code> will be available for consumers of this group.
* <p>
* {@link StreamId#NEWEST} is used for messages arrived since the moment of group creating
* {@link StreamMessageId#NEWEST} is used for messages arrived since the moment of group creating
*
* @param groupName - name of group
* @param id - stream id
* @return void
*/
RFuture<Void> createGroupAsync(String groupName, StreamId id);
RFuture<Void> createGroupAsync(String groupName, StreamMessageId id);
/**
* Marks pending messages by group name and stream <code>ids</code> as correctly processed.
@ -58,7 +58,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param ids - stream ids
* @return marked messages amount
*/
RFuture<Long> ackAsync(String groupName, StreamId... ids);
RFuture<Long> ackAsync(String groupName, StreamMessageId... ids);
/**
* Returns pending messages by group name
@ -72,8 +72,8 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* Returns list of pending messages by group name.
* Limited by start stream id and end stream id and count.
* <p>
* {@link StreamId#MAX} is used as max stream id
* {@link StreamId#MIN} is used as min stream id
* {@link StreamMessageId#MAX} is used as max stream id
* {@link StreamMessageId#MIN} is used as min stream id
*
* @param groupName - name of group
* @param startId - start stream id
@ -81,14 +81,14 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param count - amount of messages
* @return list
*/
RFuture<List<PendingEntry>> listPendingAsync(String groupName, StreamId startId, StreamId endId, int count);
RFuture<List<PendingEntry>> listPendingAsync(String groupName, StreamMessageId startId, StreamMessageId endId, int count);
/**
* Returns list of pending messages by group name and consumer name.
* Limited by start stream id and end stream id and count.
* <p>
* {@link StreamId#MAX} is used as max stream id
* {@link StreamId#MIN} is used as min stream id
* {@link StreamMessageId#MAX} is used as max stream id
* {@link StreamMessageId#MIN} is used as min stream id
*
* @param consumerName - name of consumer
* @param groupName - name of group
@ -97,7 +97,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param count - amount of messages
* @return list
*/
RFuture<List<PendingEntry>> listPendingAsync(String groupName, StreamId startId, StreamId endId, int count, String consumerName);
RFuture<List<PendingEntry>> listPendingAsync(String groupName, StreamMessageId startId, StreamMessageId endId, int count, String consumerName);
/**
* Transfers ownership of pending messages by id to a new consumer
@ -110,7 +110,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param ids - stream ids
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> claimAsync(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamId ... ids);
RFuture<Map<StreamMessageId, Map<K, V>>> claimAsync(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
@ -120,7 +120,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, StreamId ... ids);
RFuture<Map<StreamMessageId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, StreamMessageId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
@ -131,7 +131,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, int count, StreamId ... ids);
RFuture<Map<StreamMessageId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, int count, StreamMessageId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
@ -144,7 +144,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit, StreamId ... ids);
RFuture<Map<StreamMessageId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
@ -158,7 +158,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamId ... ids);
RFuture<Map<StreamMessageId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId ... ids);
/**
* Returns number of entries in stream
@ -174,7 +174,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param value - value of entry
* @return Stream ID
*/
RFuture<StreamId> addAsync(K key, V value);
RFuture<StreamMessageId> addAsync(K key, V value);
/**
* Appends a new entry by specified Stream ID
@ -184,7 +184,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param value - value of entry
* @return void
*/
RFuture<Void> addAsync(StreamId id, K key, V value);
RFuture<Void> addAsync(StreamMessageId id, K key, V value);
/**
* Appends a new entry and returns generated Stream ID.
@ -197,7 +197,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return Stream ID
*/
RFuture<StreamId> addAsync(K key, V value, int trimLen, boolean trimStrict);
RFuture<StreamMessageId> addAsync(K key, V value, int trimLen, boolean trimStrict);
/**
* Appends a new entry by specified Stream ID.
@ -211,7 +211,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return void
*/
RFuture<Void> addAsync(StreamId id, K key, V value, int trimLen, boolean trimStrict);
RFuture<Void> addAsync(StreamMessageId id, K key, V value, int trimLen, boolean trimStrict);
/**
* Appends new entries and returns generated Stream ID
@ -219,7 +219,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param entries - entries to add
* @return Stream ID
*/
RFuture<StreamId> addAllAsync(Map<K, V> entries);
RFuture<StreamMessageId> addAllAsync(Map<K, V> entries);
/**
* Appends new entries by specified Stream ID
@ -228,7 +228,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param entries - entries to add
* @return void
*/
RFuture<Void> addAllAsync(StreamId id, Map<K, V> entries);
RFuture<Void> addAllAsync(StreamMessageId id, Map<K, V> entries);
/**
* Appends new entries and returns generated Stream ID.
@ -240,7 +240,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return Stream ID
*/
RFuture<StreamId> addAllAsync(Map<K, V> entries, int trimLen, boolean trimStrict);
RFuture<StreamMessageId> addAllAsync(Map<K, V> entries, int trimLen, boolean trimStrict);
/**
* Appends new entries by specified Stream ID.
@ -253,7 +253,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return void
*/
RFuture<Void> addAllAsync(StreamId id, Map<K, V> entries, int trimLen, boolean trimStrict);
RFuture<Void> addAllAsync(StreamMessageId id, Map<K, V> entries, int trimLen, boolean trimStrict);
/**
* Read stream data by specified collection of Stream IDs.
@ -261,7 +261,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> readAsync(StreamId ... ids);
RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(StreamMessageId ... ids);
/**
* Read stream data by specified collection of Stream IDs.
@ -270,7 +270,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> readAsync(int count, StreamId ... ids);
RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(int count, StreamMessageId ... ids);
/**
* Read stream data by specified collection of Stream IDs.
@ -281,7 +281,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> readAsync(long timeout, TimeUnit unit, StreamId ... ids);
RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(long timeout, TimeUnit unit, StreamMessageId ... ids);
/**
* Read stream data by specified collection of Stream IDs.
@ -293,7 +293,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> readAsync(int count, long timeout, TimeUnit unit, StreamId ... ids);
RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId ... ids);
/**
* Read stream data by specified stream name including this stream.
@ -303,7 +303,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(StreamId id, String name2, StreamId id2);
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
@ -315,7 +315,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(StreamId id, String name2, StreamId id2, String name3, StreamId id3);
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
@ -324,7 +324,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(StreamId id, Map<String, StreamId> nameToId);
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data by specified stream name including this stream.
@ -335,7 +335,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, StreamId id, String name2, StreamId id2);
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
@ -348,7 +348,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, StreamId id, String name2, StreamId id2, String name3, StreamId id3);
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
@ -358,7 +358,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, StreamId id, Map<String, StreamId> nameToId);
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data by specified stream name including this stream.
@ -371,7 +371,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2);
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
@ -386,7 +386,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2, String name3, StreamId id3);
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
@ -398,7 +398,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> nameToId);
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data by specified stream name including this stream.
@ -412,7 +412,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2);
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
@ -428,7 +428,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2, String name3, StreamId id3);
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
@ -441,7 +441,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> nameToId);
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data in range by specified start Stream ID (included) and end Stream ID (included).
@ -450,7 +450,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> rangeAsync(StreamId startId, StreamId endId);
RFuture<Map<StreamMessageId, Map<K, V>>> rangeAsync(StreamMessageId startId, StreamMessageId endId);
/**
* Read stream data in range by specified start Stream ID (included) and end Stream ID (included).
@ -460,7 +460,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> rangeAsync(int count, StreamId startId, StreamId endId);
RFuture<Map<StreamMessageId, Map<K, V>>> rangeAsync(int count, StreamMessageId startId, StreamMessageId endId);
/**
* Read stream data in reverse order in range by specified start Stream ID (included) and end Stream ID (included).
@ -469,7 +469,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> rangeReversedAsync(StreamId startId, StreamId endId);
RFuture<Map<StreamMessageId, Map<K, V>>> rangeReversedAsync(StreamMessageId startId, StreamMessageId endId);
/**
* Read stream data in reverse order in range by specified start Stream ID (included) and end Stream ID (included).
@ -479,6 +479,14 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> rangeReversedAsync(int count, StreamId startId, StreamId endId);
RFuture<Map<StreamMessageId, Map<K, V>>> rangeReversedAsync(int count, StreamMessageId startId, StreamMessageId endId);
/**
* Removes messages by id.
*
* @param ids - id of messages to remove
* @return deleted messages amount
*/
RFuture<Long> removeAsync(StreamMessageId... ids);
}

@ -45,13 +45,13 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* Creates consumer group by name and stream id.
* Only new messages after defined stream <code>id</code> will be available for consumers of this group.
* <p>
* {@link StreamId#NEWEST} is used for messages arrived since the moment of group creating
* {@link StreamMessageId#NEWEST} is used for messages arrived since the moment of group creating
*
* @param groupName - name of group
* @param id - stream id
* @return void
*/
Publisher<Void> createGroup(String groupName, StreamId id);
Publisher<Void> createGroup(String groupName, StreamMessageId id);
/**
* Marks pending messages by group name and stream <code>ids</code> as correctly processed.
@ -60,7 +60,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param ids - stream ids
* @return marked messages amount
*/
Publisher<Long> ack(String groupName, StreamId... ids);
Publisher<Long> ack(String groupName, StreamMessageId... ids);
/**
* Returns pending messages by group name
@ -74,8 +74,8 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* Returns list of pending messages by group name.
* Limited by start stream id and end stream id and count.
* <p>
* {@link StreamId#MAX} is used as max stream id
* {@link StreamId#MIN} is used as min stream id
* {@link StreamMessageId#MAX} is used as max stream id
* {@link StreamMessageId#MIN} is used as min stream id
*
* @param groupName - name of group
* @param startId - start stream id
@ -83,14 +83,14 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param count - amount of messages
* @return list
*/
Publisher<List<PendingEntry>> listPending(String groupName, StreamId startId, StreamId endId, int count);
Publisher<List<PendingEntry>> listPending(String groupName, StreamMessageId startId, StreamMessageId endId, int count);
/**
* Returns list of pending messages by group name and consumer name.
* Limited by start stream id and end stream id and count.
* <p>
* {@link StreamId#MAX} is used as max stream id
* {@link StreamId#MIN} is used as min stream id
* {@link StreamMessageId#MAX} is used as max stream id
* {@link StreamMessageId#MIN} is used as min stream id
*
* @param consumerName - name of consumer
* @param groupName - name of group
@ -99,7 +99,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param count - amount of messages
* @return list
*/
Publisher<List<PendingEntry>> listPending(String groupName, StreamId startId, StreamId endId, int count, String consumerName);
Publisher<List<PendingEntry>> listPending(String groupName, StreamMessageId startId, StreamMessageId endId, int count, String consumerName);
/**
* Transfers ownership of pending messages by id to a new consumer
@ -112,7 +112,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param ids - stream ids
* @return stream data mapped by Stream ID
*/
Publisher<Map<StreamId, Map<K, V>>> claim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamId ... ids);
Publisher<Map<StreamMessageId, Map<K, V>>> claim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
@ -122,7 +122,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Publisher<Map<StreamId, Map<K, V>>> readGroup(String groupName, String consumerName, StreamId ... ids);
Publisher<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, StreamMessageId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
@ -133,7 +133,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Publisher<Map<StreamId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, StreamId ... ids);
Publisher<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, StreamMessageId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
@ -146,7 +146,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Publisher<Map<StreamId, Map<K, V>>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamId ... ids);
Publisher<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
@ -160,7 +160,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Publisher<Map<StreamId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamId ... ids);
Publisher<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId ... ids);
/**
@ -177,7 +177,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param value - value of entry
* @return Stream ID
*/
Publisher<StreamId> add(K key, V value);
Publisher<StreamMessageId> add(K key, V value);
/**
* Appends a new entry by specified Stream ID
@ -187,7 +187,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param value - value of entry
* @return void
*/
Publisher<Void> add(StreamId id, K key, V value);
Publisher<Void> add(StreamMessageId id, K key, V value);
/**
* Appends a new entry and returns generated Stream ID.
@ -200,7 +200,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return Stream ID
*/
Publisher<StreamId> add(K key, V value, int trimLen, boolean trimStrict);
Publisher<StreamMessageId> add(K key, V value, int trimLen, boolean trimStrict);
/**
* Appends a new entry by specified Stream ID.
@ -214,7 +214,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return void
*/
Publisher<Void> add(StreamId id, K key, V value, int trimLen, boolean trimStrict);
Publisher<Void> add(StreamMessageId id, K key, V value, int trimLen, boolean trimStrict);
/**
* Appends new entries and returns generated Stream ID
@ -222,7 +222,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param entries - entries to add
* @return Stream ID
*/
Publisher<StreamId> addAll(Map<K, V> entries);
Publisher<StreamMessageId> addAll(Map<K, V> entries);
/**
* Appends new entries by specified Stream ID
@ -231,7 +231,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param entries - entries to add
* @return void
*/
Publisher<Void> addAll(StreamId id, Map<K, V> entries);
Publisher<Void> addAll(StreamMessageId id, Map<K, V> entries);
/**
* Appends new entries and returns generated Stream ID.
@ -243,7 +243,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return Stream ID
*/
Publisher<StreamId> addAll(Map<K, V> entries, int trimLen, boolean trimStrict);
Publisher<StreamMessageId> addAll(Map<K, V> entries, int trimLen, boolean trimStrict);
/**
* Appends new entries by specified Stream ID.
@ -256,7 +256,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return void
*/
Publisher<Void> addAll(StreamId id, Map<K, V> entries, int trimLen, boolean trimStrict);
Publisher<Void> addAll(StreamMessageId id, Map<K, V> entries, int trimLen, boolean trimStrict);
/**
* Read stream data by specified collection of Stream IDs.
@ -264,7 +264,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Publisher<Map<StreamId, Map<K, V>>> read(StreamId ... ids);
Publisher<Map<StreamMessageId, Map<K, V>>> read(StreamMessageId ... ids);
/**
* Read stream data by specified collection of Stream IDs.
@ -273,7 +273,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Publisher<Map<StreamId, Map<K, V>>> read(int count, StreamId ... ids);
Publisher<Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId ... ids);
/**
* Read stream data by specified collection of Stream IDs.
@ -284,7 +284,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Publisher<Map<StreamId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamId ... ids);
Publisher<Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId ... ids);
/**
* Read stream data by specified collection of Stream IDs.
@ -296,7 +296,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Publisher<Map<StreamId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamId ... ids);
Publisher<Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId ... ids);
/**
* Read stream data by specified stream name including this stream.
@ -306,7 +306,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
*/
Publisher<Map<String, Map<StreamId, Map<K, V>>>> read(StreamId id, String name2, StreamId id2);
Publisher<Map<String, Map<StreamMessageId, Map<K, V>>>> read(StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
@ -318,7 +318,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
*/
Publisher<Map<String, Map<StreamId, Map<K, V>>>> read(StreamId id, String name2, StreamId id2, String name3, StreamId id3);
Publisher<Map<String, Map<StreamMessageId, Map<K, V>>>> read(StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
@ -327,7 +327,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
*/
Publisher<Map<String, Map<StreamId, Map<K, V>>>> read(StreamId id, Map<String, StreamId> nameToId);
Publisher<Map<String, Map<StreamMessageId, Map<K, V>>>> read(StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data by specified stream name including this stream.
@ -338,7 +338,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
*/
Publisher<Map<String, Map<StreamId, Map<K, V>>>> read(int count, StreamId id, String name2, StreamId id2);
Publisher<Map<String, Map<StreamMessageId, Map<K, V>>>> read(int count, StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
@ -351,7 +351,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
*/
Publisher<Map<String, Map<StreamId, Map<K, V>>>> read(int count, StreamId id, String name2, StreamId id2, String name3, StreamId id3);
Publisher<Map<String, Map<StreamMessageId, Map<K, V>>>> read(int count, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
@ -361,7 +361,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
*/
Publisher<Map<String, Map<StreamId, Map<K, V>>>> read(int count, StreamId id, Map<String, StreamId> nameToId);
Publisher<Map<String, Map<StreamMessageId, Map<K, V>>>> read(int count, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data by specified stream name including this stream.
@ -374,7 +374,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
*/
Publisher<Map<String, Map<StreamId, Map<K, V>>>> read(long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2);
Publisher<Map<String, Map<StreamMessageId, Map<K, V>>>> read(long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
@ -389,7 +389,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
*/
Publisher<Map<String, Map<StreamId, Map<K, V>>>> read(long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2, String name3, StreamId id3);
Publisher<Map<String, Map<StreamMessageId, Map<K, V>>>> read(long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
@ -401,7 +401,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
*/
Publisher<Map<String, Map<StreamId, Map<K, V>>>> read(long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> nameToId);
Publisher<Map<String, Map<StreamMessageId, Map<K, V>>>> read(long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data by specified stream name including this stream.
@ -415,7 +415,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
*/
Publisher<Map<String, Map<StreamId, Map<K, V>>>> read(int count, long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2);
Publisher<Map<String, Map<StreamMessageId, Map<K, V>>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
@ -431,7 +431,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
*/
Publisher<Map<String, Map<StreamId, Map<K, V>>>> read(int count, long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2, String name3, StreamId id3);
Publisher<Map<String, Map<StreamMessageId, Map<K, V>>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
@ -444,7 +444,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
*/
Publisher<Map<String, Map<StreamId, Map<K, V>>>> read(int count, long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> nameToId);
Publisher<Map<String, Map<StreamMessageId, Map<K, V>>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data in range by specified start Stream ID (included) and end Stream ID (included).
@ -453,7 +453,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
*/
Publisher<Map<StreamId, Map<K, V>>> range(StreamId startId, StreamId endId);
Publisher<Map<StreamMessageId, Map<K, V>>> range(StreamMessageId startId, StreamMessageId endId);
/**
* Read stream data in range by specified start Stream ID (included) and end Stream ID (included).
@ -463,7 +463,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
*/
Publisher<Map<StreamId, Map<K, V>>> range(int count, StreamId startId, StreamId endId);
Publisher<Map<StreamMessageId, Map<K, V>>> range(int count, StreamMessageId startId, StreamMessageId endId);
/**
* Read stream data in reverse order in range by specified start Stream ID (included) and end Stream ID (included).
@ -472,7 +472,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
*/
Publisher<Map<StreamId, Map<K, V>>> rangeReversed(StreamId startId, StreamId endId);
Publisher<Map<StreamMessageId, Map<K, V>>> rangeReversed(StreamMessageId startId, StreamMessageId endId);
/**
* Read stream data in reverse order in range by specified start Stream ID (included) and end Stream ID (included).
@ -482,6 +482,6 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
*/
Publisher<Map<StreamId, Map<K, V>>> rangeReversed(int count, StreamId startId, StreamId endId);
Publisher<Map<StreamMessageId, Map<K, V>>> rangeReversed(int count, StreamMessageId startId, StreamMessageId endId);
}

@ -45,13 +45,13 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* Creates consumer group by name and stream id.
* Only new messages after defined stream <code>id</code> will be available for consumers of this group.
* <p>
* {@link StreamId#NEWEST} is used for messages arrived since the moment of group creating
* {@link StreamMessageId#NEWEST} is used for messages arrived since the moment of group creating
*
* @param groupName - name of group
* @param id - stream id
* @return void
*/
Flowable<Void> createGroup(String groupName, StreamId id);
Flowable<Void> createGroup(String groupName, StreamMessageId id);
/**
* Marks pending messages by group name and stream <code>ids</code> as correctly processed.
@ -60,7 +60,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param ids - stream ids
* @return marked messages amount
*/
Flowable<Long> ack(String groupName, StreamId... ids);
Flowable<Long> ack(String groupName, StreamMessageId... ids);
/**
* Returns pending messages by group name
@ -74,8 +74,8 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* Returns list of pending messages by group name.
* Limited by start stream id and end stream id and count.
* <p>
* {@link StreamId#MAX} is used as max stream id
* {@link StreamId#MIN} is used as min stream id
* {@link StreamMessageId#MAX} is used as max stream id
* {@link StreamMessageId#MIN} is used as min stream id
*
* @param groupName - name of group
* @param startId - start stream id
@ -83,14 +83,14 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param count - amount of messages
* @return list
*/
Flowable<List<PendingEntry>> listPending(String groupName, StreamId startId, StreamId endId, int count);
Flowable<List<PendingEntry>> listPending(String groupName, StreamMessageId startId, StreamMessageId endId, int count);
/**
* Returns list of pending messages by group name and consumer name.
* Limited by start stream id and end stream id and count.
* <p>
* {@link StreamId#MAX} is used as max stream id
* {@link StreamId#MIN} is used as min stream id
* {@link StreamMessageId#MAX} is used as max stream id
* {@link StreamMessageId#MIN} is used as min stream id
*
* @param consumerName - name of consumer
* @param groupName - name of group
@ -99,7 +99,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param count - amount of messages
* @return list
*/
Flowable<List<PendingEntry>> listPending(String groupName, StreamId startId, StreamId endId, int count, String consumerName);
Flowable<List<PendingEntry>> listPending(String groupName, StreamMessageId startId, StreamMessageId endId, int count, String consumerName);
/**
* Transfers ownership of pending messages by id to a new consumer
@ -112,7 +112,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param ids - stream ids
* @return stream data mapped by Stream ID
*/
Flowable<Map<StreamId, Map<K, V>>> claim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamId ... ids);
Flowable<Map<StreamMessageId, Map<K, V>>> claim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
@ -122,7 +122,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Flowable<Map<StreamId, Map<K, V>>> readGroup(String groupName, String consumerName, StreamId ... ids);
Flowable<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, StreamMessageId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
@ -133,7 +133,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Flowable<Map<StreamId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, StreamId ... ids);
Flowable<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, StreamMessageId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
@ -146,7 +146,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Flowable<Map<StreamId, Map<K, V>>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamId ... ids);
Flowable<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
@ -160,7 +160,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Flowable<Map<StreamId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamId ... ids);
Flowable<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId ... ids);
/**
@ -177,7 +177,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param value - value of entry
* @return Stream ID
*/
Flowable<StreamId> add(K key, V value);
Flowable<StreamMessageId> add(K key, V value);
/**
* Appends a new entry by specified Stream ID
@ -187,7 +187,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param value - value of entry
* @return void
*/
Flowable<Void> add(StreamId id, K key, V value);
Flowable<Void> add(StreamMessageId id, K key, V value);
/**
* Appends a new entry and returns generated Stream ID.
@ -200,7 +200,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return Stream ID
*/
Flowable<StreamId> add(K key, V value, int trimLen, boolean trimStrict);
Flowable<StreamMessageId> add(K key, V value, int trimLen, boolean trimStrict);
/**
* Appends a new entry by specified Stream ID.
@ -214,7 +214,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return void
*/
Flowable<Void> add(StreamId id, K key, V value, int trimLen, boolean trimStrict);
Flowable<Void> add(StreamMessageId id, K key, V value, int trimLen, boolean trimStrict);
/**
* Appends new entries and returns generated Stream ID
@ -222,7 +222,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param entries - entries to add
* @return Stream ID
*/
Flowable<StreamId> addAll(Map<K, V> entries);
Flowable<StreamMessageId> addAll(Map<K, V> entries);
/**
* Appends new entries by specified Stream ID
@ -231,7 +231,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param entries - entries to add
* @return void
*/
Flowable<Void> addAll(StreamId id, Map<K, V> entries);
Flowable<Void> addAll(StreamMessageId id, Map<K, V> entries);
/**
* Appends new entries and returns generated Stream ID.
@ -243,7 +243,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return Stream ID
*/
Flowable<StreamId> addAll(Map<K, V> entries, int trimLen, boolean trimStrict);
Flowable<StreamMessageId> addAll(Map<K, V> entries, int trimLen, boolean trimStrict);
/**
* Appends new entries by specified Stream ID.
@ -256,7 +256,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return void
*/
Flowable<Void> addAll(StreamId id, Map<K, V> entries, int trimLen, boolean trimStrict);
Flowable<Void> addAll(StreamMessageId id, Map<K, V> entries, int trimLen, boolean trimStrict);
/**
* Read stream data by specified collection of Stream IDs.
@ -264,7 +264,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Flowable<Map<StreamId, Map<K, V>>> read(StreamId ... ids);
Flowable<Map<StreamMessageId, Map<K, V>>> read(StreamMessageId ... ids);
/**
* Read stream data by specified collection of Stream IDs.
@ -273,7 +273,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Flowable<Map<StreamId, Map<K, V>>> read(int count, StreamId ... ids);
Flowable<Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId ... ids);
/**
* Read stream data by specified collection of Stream IDs.
@ -284,7 +284,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Flowable<Map<StreamId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamId ... ids);
Flowable<Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId ... ids);
/**
* Read stream data by specified collection of Stream IDs.
@ -296,7 +296,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Flowable<Map<StreamId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamId ... ids);
Flowable<Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId ... ids);
/**
* Read stream data by specified stream name including this stream.
@ -306,7 +306,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
*/
Flowable<Map<String, Map<StreamId, Map<K, V>>>> read(StreamId id, String name2, StreamId id2);
Flowable<Map<String, Map<StreamMessageId, Map<K, V>>>> read(StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
@ -318,7 +318,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
*/
Flowable<Map<String, Map<StreamId, Map<K, V>>>> read(StreamId id, String name2, StreamId id2, String name3, StreamId id3);
Flowable<Map<String, Map<StreamMessageId, Map<K, V>>>> read(StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
@ -327,7 +327,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
*/
Flowable<Map<String, Map<StreamId, Map<K, V>>>> read(StreamId id, Map<String, StreamId> nameToId);
Flowable<Map<String, Map<StreamMessageId, Map<K, V>>>> read(StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data by specified stream name including this stream.
@ -338,7 +338,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
*/
Flowable<Map<String, Map<StreamId, Map<K, V>>>> read(int count, StreamId id, String name2, StreamId id2);
Flowable<Map<String, Map<StreamMessageId, Map<K, V>>>> read(int count, StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
@ -351,7 +351,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
*/
Flowable<Map<String, Map<StreamId, Map<K, V>>>> read(int count, StreamId id, String name2, StreamId id2, String name3, StreamId id3);
Flowable<Map<String, Map<StreamMessageId, Map<K, V>>>> read(int count, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
@ -361,7 +361,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
*/
Flowable<Map<String, Map<StreamId, Map<K, V>>>> read(int count, StreamId id, Map<String, StreamId> nameToId);
Flowable<Map<String, Map<StreamMessageId, Map<K, V>>>> read(int count, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data by specified stream name including this stream.
@ -374,7 +374,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
*/
Flowable<Map<String, Map<StreamId, Map<K, V>>>> read(long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2);
Flowable<Map<String, Map<StreamMessageId, Map<K, V>>>> read(long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
@ -389,7 +389,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
*/
Flowable<Map<String, Map<StreamId, Map<K, V>>>> read(long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2, String name3, StreamId id3);
Flowable<Map<String, Map<StreamMessageId, Map<K, V>>>> read(long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
@ -401,7 +401,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
*/
Flowable<Map<String, Map<StreamId, Map<K, V>>>> read(long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> nameToId);
Flowable<Map<String, Map<StreamMessageId, Map<K, V>>>> read(long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data by specified stream name including this stream.
@ -415,7 +415,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
*/
Flowable<Map<String, Map<StreamId, Map<K, V>>>> read(int count, long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2);
Flowable<Map<String, Map<StreamMessageId, Map<K, V>>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
@ -431,7 +431,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
*/
Flowable<Map<String, Map<StreamId, Map<K, V>>>> read(int count, long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2, String name3, StreamId id3);
Flowable<Map<String, Map<StreamMessageId, Map<K, V>>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
@ -444,7 +444,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
*/
Flowable<Map<String, Map<StreamId, Map<K, V>>>> read(int count, long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> nameToId);
Flowable<Map<String, Map<StreamMessageId, Map<K, V>>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data in range by specified start Stream ID (included) and end Stream ID (included).
@ -453,7 +453,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
*/
Flowable<Map<StreamId, Map<K, V>>> range(StreamId startId, StreamId endId);
Flowable<Map<StreamMessageId, Map<K, V>>> range(StreamMessageId startId, StreamMessageId endId);
/**
* Read stream data in range by specified start Stream ID (included) and end Stream ID (included).
@ -463,7 +463,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
*/
Flowable<Map<StreamId, Map<K, V>>> range(int count, StreamId startId, StreamId endId);
Flowable<Map<StreamMessageId, Map<K, V>>> range(int count, StreamMessageId startId, StreamMessageId endId);
/**
* Read stream data in reverse order in range by specified start Stream ID (included) and end Stream ID (included).
@ -472,7 +472,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
*/
Flowable<Map<StreamId, Map<K, V>>> rangeReversed(StreamId startId, StreamId endId);
Flowable<Map<StreamMessageId, Map<K, V>>> rangeReversed(StreamMessageId startId, StreamMessageId endId);
/**
* Read stream data in reverse order in range by specified start Stream ID (included) and end Stream ID (included).
@ -482,6 +482,6 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
*/
Flowable<Map<StreamId, Map<K, V>>> rangeReversed(int count, StreamId startId, StreamId endId);
Flowable<Map<StreamMessageId, Map<K, V>>> rangeReversed(int count, StreamMessageId startId, StreamMessageId endId);
}

@ -21,34 +21,34 @@ package org.redisson.api;
* @author Nikita Koksharov
*
*/
public class StreamId {
public class StreamMessageId {
/**
* Defines minimal id. Used in {@link RStream#range} and {@link RStreamAsync#rangeAsync} methods
*/
public static final StreamId MIN = new StreamId(-1);
public static final StreamMessageId MIN = new StreamMessageId(-1);
/**
* Defines maximal id. Used in {@link RStream#range} and {@link RStreamAsync#rangeAsync} methods
*/
public static final StreamId MAX = new StreamId(-1);
public static final StreamMessageId MAX = new StreamMessageId(-1);
/**
* Defines latest id to receive Stream entries added since method invocation.
* <p>
* Used in {@link RStream#read} and {@link RStreamAsync#readAsync} methods
*/
public static final StreamId NEWEST = new StreamId(-1);
public static final StreamMessageId NEWEST = new StreamMessageId(-1);
private long id0;
private long id1;
public StreamId(long id0) {
public StreamMessageId(long id0) {
super();
this.id0 = id0;
}
public StreamId(long id0, long id1) {
public StreamMessageId(long id0, long id1) {
super();
this.id0 = id0;
this.id1 = id1;
@ -89,7 +89,7 @@ public class StreamId {
return false;
if (getClass() != obj.getClass())
return false;
StreamId other = (StreamId) obj;
StreamMessageId other = (StreamMessageId) obj;
if (id0 != other.id0)
return false;
if (id1 != other.id1)

@ -23,7 +23,7 @@ import java.util.Map.Entry;
import java.util.Set;
import org.redisson.api.RType;
import org.redisson.api.StreamId;
import org.redisson.api.StreamMessageId;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.convertor.BitsSizeReplayConvertor;
@ -325,17 +325,17 @@ public interface RedisCommands {
RedisCommand<Boolean> SETNX = new RedisCommand<Boolean>("SETNX", new BooleanReplayConvertor());
RedisCommand<Void> PSETEX = new RedisCommand<Void>("PSETEX", new VoidReplayConvertor());
RedisCommand<Map<String, Map<StreamId, Map<Object, Object>>>> XRANGE = new RedisCommand<Map<String, Map<StreamId, Map<Object, Object>>>>("XRANGE",
RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> XRANGE = new RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>>("XRANGE",
new ListMultiDecoder(
new ObjectDecoder(new StreamIdDecoder()),
new ObjectMapReplayDecoder(),
new ObjectMapReplayDecoder(ListMultiDecoder.RESET),
new ObjectMapJoinDecoder()));
RedisCommand<Map<String, Map<StreamId, Map<Object, Object>>>> XREVRANGE = new RedisCommand<Map<String, Map<StreamId, Map<Object, Object>>>>("XREVRANGE",
RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> XREVRANGE = new RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>>("XREVRANGE",
XRANGE.getReplayMultiDecoder());
RedisCommand<Map<String, Map<StreamId, Map<Object, Object>>>> XREAD = new RedisCommand<Map<String, Map<StreamId, Map<Object, Object>>>>("XREAD",
RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> XREAD = new RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>>("XREAD",
new ListMultiDecoder(
new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()),
new ObjectDecoder(new StreamIdDecoder()),
@ -345,9 +345,9 @@ public interface RedisCommands {
new ObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX),
new StreamResultDecoder()));
RedisCommand<Map<String, Map<StreamId, Map<Object, Object>>>> XREAD_BLOCKING = new RedisCommand<Map<String, Map<StreamId, Map<Object, Object>>>>("XREAD", XREAD.getReplayMultiDecoder());
RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> XREAD_BLOCKING = new RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>>("XREAD", XREAD.getReplayMultiDecoder());
RedisCommand<Map<StreamId, Map<Object, Object>>> XREAD_SINGLE = new RedisCommand<Map<StreamId, Map<Object, Object>>>("XREAD",
RedisCommand<Map<StreamMessageId, Map<Object, Object>>> XREAD_SINGLE = new RedisCommand<Map<StreamMessageId, Map<Object, Object>>>("XREAD",
new ListMultiDecoder(
new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()),
new ObjectDecoder(new StreamIdDecoder()),
@ -357,9 +357,9 @@ public interface RedisCommands {
new ObjectMapReplayDecoder(),
new StreamResultDecoder()));
RedisCommand<Map<StreamId, Map<Object, Object>>> XREAD_BLOCKING_SINGLE = new RedisCommand<Map<StreamId, Map<Object, Object>>>("XREAD", XREAD_SINGLE.getReplayMultiDecoder());
RedisCommand<Map<StreamMessageId, Map<Object, Object>>> XREAD_BLOCKING_SINGLE = new RedisCommand<Map<StreamMessageId, Map<Object, Object>>>("XREAD", XREAD_SINGLE.getReplayMultiDecoder());
RedisCommand<Map<String, Map<StreamId, Map<Object, Object>>>> XREADGROUP = new RedisCommand<Map<String, Map<StreamId, Map<Object, Object>>>>("XREADGROUP",
RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> XREADGROUP = new RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>>("XREADGROUP",
new ListMultiDecoder(
new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()),
new ObjectDecoder(new StreamIdDecoder()),
@ -369,7 +369,7 @@ public interface RedisCommands {
new ObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX),
new StreamResultDecoder()));
RedisCommand<Map<StreamId, Map<Object, Object>>> XREADGROUP_BLOCKING = new RedisCommand<Map<StreamId, Map<Object, Object>>>("XREADGROUP",
RedisCommand<Map<StreamMessageId, Map<Object, Object>>> XREADGROUP_BLOCKING = new RedisCommand<Map<StreamMessageId, Map<Object, Object>>>("XREADGROUP",
new ListMultiDecoder(
new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()),
new ObjectDecoder(new StreamIdDecoder()),
@ -379,7 +379,7 @@ public interface RedisCommands {
new ObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX),
new StreamResultDecoder()));
RedisCommand<Map<StreamId, Map<Object, Object>>> XREADGROUP_SINGLE = new RedisCommand<Map<StreamId, Map<Object, Object>>>("XREADGROUP",
RedisCommand<Map<StreamMessageId, Map<Object, Object>>> XREADGROUP_SINGLE = new RedisCommand<Map<StreamMessageId, Map<Object, Object>>>("XREADGROUP",
new ListMultiDecoder(
new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()),
new ObjectDecoder(new StreamIdDecoder()),
@ -389,24 +389,25 @@ public interface RedisCommands {
new ObjectMapReplayDecoder(),
new StreamResultDecoder()));
RedisCommand<Map<String, Map<StreamId, Map<Object, Object>>>> XCLAIM = new RedisCommand<Map<String, Map<StreamId, Map<Object, Object>>>>("XCLAIM",
RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> XCLAIM = new RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>>("XCLAIM",
new ListMultiDecoder(
new ObjectDecoder(new StreamIdDecoder()),
new ObjectMapReplayDecoder(),
new ObjectMapReplayDecoder(ListMultiDecoder.RESET),
new ObjectMapJoinDecoder()));
RedisCommand<Map<StreamId, Map<Object, Object>>> XREADGROUP_BLOCKING_SINGLE = new RedisCommand<Map<StreamId, Map<Object, Object>>>("XREADGROUP",
RedisCommand<Map<StreamMessageId, Map<Object, Object>>> XREADGROUP_BLOCKING_SINGLE = new RedisCommand<Map<StreamMessageId, Map<Object, Object>>>("XREADGROUP",
XREADGROUP_SINGLE.getReplayMultiDecoder());
Set<RedisCommand> BLOCKING_COMMANDS = new HashSet<RedisCommand>(Arrays.asList(
XREAD_BLOCKING_SINGLE, XREAD_BLOCKING, XREADGROUP_BLOCKING_SINGLE, XREADGROUP_BLOCKING));
RedisStrictCommand<StreamId> XADD = new RedisStrictCommand<StreamId>("XADD", new StreamIdConvertor());
RedisStrictCommand<StreamMessageId> XADD = new RedisStrictCommand<StreamMessageId>("XADD", new StreamIdConvertor());
RedisStrictCommand<Void> XGROUP = new RedisStrictCommand<Void>("XGROUP", new VoidReplayConvertor());
RedisStrictCommand<Void> XADD_VOID = new RedisStrictCommand<Void>("XADD", new VoidReplayConvertor());
RedisStrictCommand<Long> XLEN = new RedisStrictCommand<Long>("XLEN");
RedisStrictCommand<Long> XACK = new RedisStrictCommand<Long>("XACK");
RedisStrictCommand<Long> XDEL = new RedisStrictCommand<Long>("XDEL");
RedisCommand<Object> XPENDING = new RedisCommand<Object>("XPENDING",
new ListMultiDecoder(new ObjectListReplayDecoder(), new ObjectListReplayDecoder(ListMultiDecoder.RESET), new PendingResultDecoder()));
RedisCommand<Object> XPENDING_ENTRIES = new RedisCommand<Object>("XPENDING",

@ -15,19 +15,19 @@
*/
package org.redisson.client.protocol.convertor;
import org.redisson.api.StreamId;
import org.redisson.api.StreamMessageId;
/**
*
* @author Nikita Koksharov
*
*/
public class StreamIdConvertor extends SingleConvertor<StreamId> {
public class StreamIdConvertor extends SingleConvertor<StreamMessageId> {
@Override
public StreamId convert(Object id) {
public StreamMessageId convert(Object id) {
String[] parts = id.toString().split("-");
return new StreamId(Long.valueOf(parts[0]), Long.valueOf(parts[1]));
return new StreamMessageId(Long.valueOf(parts[0]), Long.valueOf(parts[1]));
}
}

@ -17,7 +17,7 @@ package org.redisson.client.protocol.decoder;
import java.io.IOException;
import org.redisson.api.StreamId;
import org.redisson.api.StreamMessageId;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
@ -35,7 +35,7 @@ public class StreamIdDecoder implements Decoder<Object> {
public Object decode(ByteBuf buf, State state) throws IOException {
String id = (String) StringCodec.INSTANCE.getValueDecoder().decode(buf, state);
String[] parts = id.toString().split("-");
return new StreamId(Long.valueOf(parts[0]), Long.valueOf(parts[1]));
return new StreamMessageId(Long.valueOf(parts[0]), Long.valueOf(parts[1]));
}
}

@ -19,7 +19,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.redisson.api.StreamId;
import org.redisson.api.StreamMessageId;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
@ -33,7 +33,7 @@ public class StreamResultDecoder implements MultiDecoder<Object> {
@Override
public Object decode(List<Object> parts, State state) {
if (!parts.isEmpty()) {
Map<String, Map<StreamId, Map<Object, Object>>> result = (Map<String, Map<StreamId, Map<Object, Object>>>) parts.get(0);
Map<String, Map<StreamMessageId, Map<Object, Object>>> result = (Map<String, Map<StreamMessageId, Map<Object, Object>>>) parts.get(0);
return result.values().iterator().next();
}
return Collections.emptyMap();

@ -12,10 +12,22 @@ import org.junit.Test;
import org.redisson.api.PendingEntry;
import org.redisson.api.PendingResult;
import org.redisson.api.RStream;
import org.redisson.api.StreamId;
import org.redisson.api.StreamMessageId;
public class RedissonStreamTest extends BaseTest {
@Test
public void testRemove() {
RStream<String, String> stream = redisson.getStream("test");
StreamMessageId id1 = stream.add("0", "0");
StreamMessageId id2 = stream.add("1", "1");
assertThat(stream.size()).isEqualTo(2);
stream.remove(id1, id2);
assertThat(stream.size()).isZero();
}
@Test
public void testClaim() {
RStream<String, String> stream = redisson.getStream("test");
@ -24,19 +36,19 @@ public class RedissonStreamTest extends BaseTest {
stream.createGroup("testGroup");
StreamId id1 = stream.add("1", "1");
StreamId id2 = stream.add("2", "2");
StreamMessageId id1 = stream.add("1", "1");
StreamMessageId id2 = stream.add("2", "2");
Map<StreamId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
assertThat(s.size()).isEqualTo(2);
StreamId id3 = stream.add("3", "33");
StreamId id4 = stream.add("4", "44");
StreamMessageId id3 = stream.add("3", "33");
StreamMessageId id4 = stream.add("4", "44");
Map<StreamId, Map<String, String>> s2 = stream.readGroup("testGroup", "consumer2");
Map<StreamMessageId, Map<String, String>> s2 = stream.readGroup("testGroup", "consumer2");
assertThat(s2.size()).isEqualTo(2);
Map<StreamId, Map<String, String>> res = stream.claimPending("testGroup", "consumer1", 1, TimeUnit.MILLISECONDS, id3, id4);
Map<StreamMessageId, Map<String, String>> res = stream.claimPending("testGroup", "consumer1", 1, TimeUnit.MILLISECONDS, id3, id4);
assertThat(res.size()).isEqualTo(2);
assertThat(res.keySet()).containsExactly(id3, id4);
for (Map<String, String> map : res.values()) {
@ -53,16 +65,16 @@ public class RedissonStreamTest extends BaseTest {
stream.createGroup("testGroup");
StreamId id1 = stream.add("1", "1");
StreamId id2 = stream.add("2", "2");
StreamMessageId id1 = stream.add("1", "1");
StreamMessageId id2 = stream.add("2", "2");
Map<StreamId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
assertThat(s.size()).isEqualTo(2);
StreamId id3 = stream.add("3", "3");
StreamId id4 = stream.add("4", "4");
StreamMessageId id3 = stream.add("3", "3");
StreamMessageId id4 = stream.add("4", "4");
Map<StreamId, Map<String, String>> s2 = stream.readGroup("testGroup", "consumer2");
Map<StreamMessageId, Map<String, String>> s2 = stream.readGroup("testGroup", "consumer2");
assertThat(s2.size()).isEqualTo(2);
PendingResult pi = stream.listPending("testGroup");
@ -71,7 +83,7 @@ public class RedissonStreamTest extends BaseTest {
assertThat(pi.getTotal()).isEqualTo(4);
assertThat(pi.getConsumerNames().keySet()).containsExactly("consumer1", "consumer2");
List<PendingEntry> list = stream.listPending("testGroup", StreamId.MIN, StreamId.MAX, 10);
List<PendingEntry> list = stream.listPending("testGroup", StreamMessageId.MIN, StreamMessageId.MAX, 10);
assertThat(list.size()).isEqualTo(4);
for (PendingEntry pendingEntry : list) {
assertThat(pendingEntry.getId()).isIn(id1, id2, id3, id4);
@ -79,7 +91,7 @@ public class RedissonStreamTest extends BaseTest {
assertThat(pendingEntry.getLastTimeDelivered()).isOne();
}
List<PendingEntry> list2 = stream.listPending("testGroup", StreamId.MIN, StreamId.MAX, 10, "consumer1");
List<PendingEntry> list2 = stream.listPending("testGroup", StreamMessageId.MIN, StreamMessageId.MAX, 10, "consumer1");
assertThat(list2.size()).isEqualTo(2);
for (PendingEntry pendingEntry : list2) {
assertThat(pendingEntry.getId()).isIn(id1, id2);
@ -96,10 +108,10 @@ public class RedissonStreamTest extends BaseTest {
stream.createGroup("testGroup");
StreamId id1 = stream.add("1", "1");
StreamId id2 = stream.add("2", "2");
StreamMessageId id1 = stream.add("1", "1");
StreamMessageId id2 = stream.add("2", "2");
Map<StreamId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
assertThat(s.size()).isEqualTo(2);
assertThat(stream.ack("testGroup", id1, id2)).isEqualTo(2);
@ -109,7 +121,7 @@ public class RedissonStreamTest extends BaseTest {
public void testReadGroup() {
RStream<String, String> stream = redisson.getStream("test");
StreamId id0 = stream.add("0", "0");
StreamMessageId id0 = stream.add("0", "0");
stream.createGroup("testGroup", id0);
@ -117,7 +129,7 @@ public class RedissonStreamTest extends BaseTest {
stream.add("2", "2");
stream.add("3", "3");
Map<StreamId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
assertThat(s.values().iterator().next().keySet()).containsAnyOf("1", "2", "3");
assertThat(s.size()).isEqualTo(3);
@ -125,14 +137,14 @@ public class RedissonStreamTest extends BaseTest {
stream.add("2", "2");
stream.add("3", "3");
Map<StreamId, Map<String, String>> s1 = stream.readGroup("testGroup", "consumer1", 1);
Map<StreamMessageId, Map<String, String>> s1 = stream.readGroup("testGroup", "consumer1", 1);
assertThat(s1.size()).isEqualTo(1);
StreamId id = stream.add("1", "1");
StreamMessageId id = stream.add("1", "1");
stream.add("2", "2");
stream.add("3", "3");
Map<StreamId, Map<String, String>> s2 = stream.readGroup("testGroup", "consumer1", id);
Map<StreamMessageId, Map<String, String>> s2 = stream.readGroup("testGroup", "consumer1", id);
assertThat(s2.size()).isEqualTo(2);
}
@ -144,18 +156,18 @@ public class RedissonStreamTest extends BaseTest {
Map<String, String> entries1 = new HashMap<>();
entries1.put("1", "11");
entries1.put("3", "31");
stream.addAll(new StreamId(1), entries1, 1, false);
stream.addAll(new StreamMessageId(1), entries1, 1, false);
assertThat(stream.size()).isEqualTo(1);
Map<String, String> entries2 = new HashMap<>();
entries2.put("5", "55");
entries2.put("7", "77");
stream.addAll(new StreamId(2), entries2, 1, false);
stream.addAll(new StreamMessageId(2), entries2, 1, false);
Map<StreamId, Map<String, String>> r2 = stream.rangeReversed(10, StreamId.MAX, StreamId.MIN);
assertThat(r2.keySet()).containsExactly(new StreamId(2), new StreamId(1));
assertThat(r2.get(new StreamId(1))).isEqualTo(entries1);
assertThat(r2.get(new StreamId(2))).isEqualTo(entries2);
Map<StreamMessageId, Map<String, String>> r2 = stream.rangeReversed(10, StreamMessageId.MAX, StreamMessageId.MIN);
assertThat(r2.keySet()).containsExactly(new StreamMessageId(2), new StreamMessageId(1));
assertThat(r2.get(new StreamMessageId(1))).isEqualTo(entries1);
assertThat(r2.get(new StreamMessageId(2))).isEqualTo(entries2);
}
@Test
@ -166,22 +178,22 @@ public class RedissonStreamTest extends BaseTest {
Map<String, String> entries1 = new HashMap<>();
entries1.put("1", "11");
entries1.put("3", "31");
stream.addAll(new StreamId(1), entries1, 1, false);
stream.addAll(new StreamMessageId(1), entries1, 1, false);
assertThat(stream.size()).isEqualTo(1);
Map<String, String> entries2 = new HashMap<>();
entries2.put("5", "55");
entries2.put("7", "77");
stream.addAll(new StreamId(2), entries2, 1, false);
stream.addAll(new StreamMessageId(2), entries2, 1, false);
Map<StreamId, Map<String, String>> r = stream.range(10, new StreamId(0), new StreamId(1));
Map<StreamMessageId, Map<String, String>> r = stream.range(10, new StreamMessageId(0), new StreamMessageId(1));
assertThat(r).hasSize(1);
assertThat(r.get(new StreamId(1))).isEqualTo(entries1);
assertThat(r.get(new StreamMessageId(1))).isEqualTo(entries1);
Map<StreamId, Map<String, String>> r2 = stream.range(10, StreamId.MIN, StreamId.MAX);
assertThat(r2.keySet()).containsExactly(new StreamId(1), new StreamId(2));
assertThat(r2.get(new StreamId(1))).isEqualTo(entries1);
assertThat(r2.get(new StreamId(2))).isEqualTo(entries2);
Map<StreamMessageId, Map<String, String>> r2 = stream.range(10, StreamMessageId.MIN, StreamMessageId.MAX);
assertThat(r2.keySet()).containsExactly(new StreamMessageId(1), new StreamMessageId(2));
assertThat(r2.get(new StreamMessageId(1))).isEqualTo(entries1);
assertThat(r2.get(new StreamMessageId(2))).isEqualTo(entries2);
}
@Test
@ -201,16 +213,16 @@ public class RedissonStreamTest extends BaseTest {
e.printStackTrace();
}
stream.addAll(new StreamId(1), entries1);
stream.addAll(new StreamMessageId(1), entries1);
}
};
t.start();
long start = System.currentTimeMillis();
Map<String, Map<StreamId, Map<String, String>>> s = stream.read(2, 5, TimeUnit.SECONDS, new StreamId(0), "test1", StreamId.NEWEST);
Map<String, Map<StreamMessageId, Map<String, String>>> s = stream.read(2, 5, TimeUnit.SECONDS, new StreamMessageId(0), "test1", StreamMessageId.NEWEST);
assertThat(System.currentTimeMillis() - start).isBetween(1900L, 2200L);
assertThat(s).hasSize(1);
assertThat(s.get("test").get(new StreamId(1))).isEqualTo(entries1);
assertThat(s.get("test").get(new StreamMessageId(1))).isEqualTo(entries1);
}
@Test
@ -230,16 +242,16 @@ public class RedissonStreamTest extends BaseTest {
e.printStackTrace();
}
stream.addAll(new StreamId(1), entries1);
stream.addAll(new StreamMessageId(1), entries1);
}
};
t.start();
long start = System.currentTimeMillis();
Map<StreamId, Map<String, String>> s = stream.read(2, 5, TimeUnit.SECONDS, new StreamId(0));
Map<StreamMessageId, Map<String, String>> s = stream.read(2, 5, TimeUnit.SECONDS, new StreamMessageId(0));
assertThat(System.currentTimeMillis() - start).isBetween(1900L, 2200L);
assertThat(s).hasSize(1);
assertThat(s.get(new StreamId(1))).isEqualTo(entries1);
assertThat(s.get(new StreamMessageId(1))).isEqualTo(entries1);
}
@Test
@ -250,20 +262,20 @@ public class RedissonStreamTest extends BaseTest {
Map<String, String> entries1 = new HashMap<>();
entries1.put("1", "11");
entries1.put("3", "31");
stream.addAll(new StreamId(1), entries1, 1, false);
stream.addAll(new StreamMessageId(1), entries1, 1, false);
assertThat(stream.size()).isEqualTo(1);
Map<String, String> entries2 = new HashMap<>();
entries2.put("5", "55");
entries2.put("7", "77");
stream.addAll(new StreamId(2), entries2, 1, false);
stream.addAll(new StreamMessageId(2), entries2, 1, false);
assertThat(stream.size()).isEqualTo(2);
}
@Test
public void testReadMultiKeysEmpty() {
RStream<String, String> stream = redisson.getStream("test2");
Map<String, Map<StreamId, Map<String, String>>> s = stream.read(10, new StreamId(0), "test1", new StreamId(0));
Map<String, Map<StreamMessageId, Map<String, String>>> s = stream.read(10, new StreamMessageId(0), "test1", new StreamMessageId(0));
assertThat(s).isEmpty();
}
@ -282,7 +294,7 @@ public class RedissonStreamTest extends BaseTest {
entries2.put("6", "66");
stream2.addAll(entries2);
Map<String, Map<StreamId, Map<String, String>>> s = stream2.read(10, new StreamId(0), "test1", new StreamId(0));
Map<String, Map<StreamMessageId, Map<String, String>>> s = stream2.read(10, new StreamMessageId(0), "test1", new StreamMessageId(0));
assertThat(s).hasSize(2);
assertThat(s.get("test1").values().iterator().next()).isEqualTo(entries1);
assertThat(s.get("test2").values().iterator().next()).isEqualTo(entries2);
@ -295,24 +307,24 @@ public class RedissonStreamTest extends BaseTest {
Map<String, String> entries1 = new LinkedHashMap<>();
entries1.put("1", "11");
entries1.put("3", "31");
stream.addAll(new StreamId(1), entries1, 1, false);
stream.addAll(new StreamMessageId(1), entries1, 1, false);
Map<String, String> entries2 = new LinkedHashMap<>();
entries2.put("5", "55");
entries2.put("7", "77");
stream.addAll(new StreamId(2), entries2, 1, false);
stream.addAll(new StreamMessageId(2), entries2, 1, false);
Map<String, String> entries3 = new LinkedHashMap<>();
entries3.put("15", "05");
entries3.put("17", "07");
stream.addAll(new StreamId(3), entries3, 1, false);
stream.addAll(new StreamMessageId(3), entries3, 1, false);
Map<StreamId, Map<String, String>> result = stream.read(10, new StreamId(0, 0));
Map<StreamMessageId, Map<String, String>> result = stream.read(10, new StreamMessageId(0, 0));
assertThat(result).hasSize(3);
assertThat(result.get(new StreamId(4))).isNull();
assertThat(result.get(new StreamId(1))).isEqualTo(entries1);
assertThat(result.get(new StreamId(2))).isEqualTo(entries2);
assertThat(result.get(new StreamId(3))).isEqualTo(entries3);
assertThat(result.get(new StreamMessageId(4))).isNull();
assertThat(result.get(new StreamMessageId(1))).isEqualTo(entries1);
assertThat(result.get(new StreamMessageId(2))).isEqualTo(entries2);
assertThat(result.get(new StreamMessageId(3))).isEqualTo(entries3);
}
@Test
@ -321,25 +333,25 @@ public class RedissonStreamTest extends BaseTest {
Map<String, String> entries1 = new LinkedHashMap<>();
entries1.put("1", "11");
entries1.put("3", "31");
stream.addAll(new StreamId(1), entries1, 1, true);
stream.addAll(new StreamMessageId(1), entries1, 1, true);
Map<StreamId, Map<String, String>> result = stream.read(10, new StreamId(0, 0));
Map<StreamMessageId, Map<String, String>> result = stream.read(10, new StreamMessageId(0, 0));
assertThat(result).hasSize(1);
assertThat(result.get(new StreamId(4))).isNull();
assertThat(result.get(new StreamId(1))).isEqualTo(entries1);
assertThat(result.get(new StreamMessageId(4))).isNull();
assertThat(result.get(new StreamMessageId(1))).isEqualTo(entries1);
}
@Test
public void testReadEmpty() {
RStream<String, String> stream2 = redisson.getStream("test");
Map<StreamId, Map<String, String>> result2 = stream2.read(10, new StreamId(0, 0));
Map<StreamMessageId, Map<String, String>> result2 = stream2.read(10, new StreamMessageId(0, 0));
assertThat(result2).isEmpty();
}
@Test
public void testAdd() {
RStream<String, String> stream = redisson.getStream("test1");
StreamId s = stream.add("12", "33");
StreamMessageId s = stream.add("12", "33");
assertThat(s.getId0()).isNotNegative();
assertThat(s.getId1()).isNotNegative();
assertThat(stream.size()).isEqualTo(1);
@ -353,13 +365,13 @@ public class RedissonStreamTest extends BaseTest {
Map<String, String> entries = new HashMap<>();
entries.put("6", "61");
entries.put("4", "41");
stream.addAll(new StreamId(12, 42), entries, 10, false);
stream.addAll(new StreamMessageId(12, 42), entries, 10, false);
assertThat(stream.size()).isEqualTo(1);
entries.clear();
entries.put("1", "11");
entries.put("3", "31");
stream.addAll(new StreamId(Long.MAX_VALUE), entries, 1, false);
stream.addAll(new StreamMessageId(Long.MAX_VALUE), entries, 1, false);
assertThat(stream.size()).isEqualTo(2);
}

Loading…
Cancel
Save