|
|
|
@ -82,26 +82,6 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
|
|
|
|
|
return commandExecutor.writeAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.XGROUP, params.toArray());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void createGroup(String groupName) {
|
|
|
|
|
get(createGroupAsync(groupName));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Void> createGroupAsync(String groupName) {
|
|
|
|
|
return createGroupAsync(groupName, StreamMessageId.NEWEST);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void createGroup(String groupName, StreamMessageId id) {
|
|
|
|
|
get(createGroupAsync(groupName, id));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Void> createGroupAsync(String groupName, StreamMessageId id) {
|
|
|
|
|
return commandExecutor.writeAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.XGROUP, "CREATE", getRawName(), groupName, id, "MKSTREAM");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Long> ackAsync(String groupName, StreamMessageId... ids) {
|
|
|
|
|
List<Object> params = new ArrayList<Object>();
|
|
|
|
@ -343,290 +323,6 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
|
|
|
|
|
return get(claimAsync(groupName, consumerName, idleTime, idleTimeUnit, ids));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Map<StreamMessageId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, StreamMessageId... ids) {
|
|
|
|
|
return readGroupAsync(groupName, consumerName, 0, ids);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Map<StreamMessageId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, int count, StreamMessageId... ids) {
|
|
|
|
|
return readGroupAsync(groupName, consumerName, count, 0, null, ids);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Map<StreamMessageId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit,
|
|
|
|
|
StreamMessageId... ids) {
|
|
|
|
|
return readGroupAsync(groupName, consumerName, 0, timeout, unit, ids);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Map<StreamMessageId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit,
|
|
|
|
|
StreamMessageId... ids) {
|
|
|
|
|
List<Object> params = new ArrayList<Object>();
|
|
|
|
|
params.add("GROUP");
|
|
|
|
|
params.add(groupName);
|
|
|
|
|
params.add(consumerName);
|
|
|
|
|
|
|
|
|
|
if (count > 0) {
|
|
|
|
|
params.add("COUNT");
|
|
|
|
|
params.add(count);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (timeout > 0) {
|
|
|
|
|
params.add("BLOCK");
|
|
|
|
|
params.add(unit.toMillis(timeout));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
params.add("STREAMS");
|
|
|
|
|
params.add(getRawName());
|
|
|
|
|
|
|
|
|
|
if (ids.length == 0) {
|
|
|
|
|
params.add(">");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (StreamMessageId id : ids) {
|
|
|
|
|
params.add(id.toString());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (timeout > 0) {
|
|
|
|
|
return commandExecutor.writeAsync(getRawName(), codec, getServiceManager().getXReadGroupBlockingSingleCommand(), params.toArray());
|
|
|
|
|
}
|
|
|
|
|
return commandExecutor.writeAsync(getRawName(), codec, getServiceManager().getXReadGroupSingleCommand(), params.toArray());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
|
|
|
|
|
return readGroup(groupName, consumerName, 0, id, keyToId);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
|
|
|
|
|
return readGroupAsync(groupName, consumerName, 0, id, keyToId);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
|
|
|
|
|
return get(readGroupAsync(groupName, consumerName, count, id, keyToId));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, int count, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
|
|
|
|
|
return readGroupAsync(groupName, consumerName, count, -1, null, id, keyToId);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id,
|
|
|
|
|
String key2, StreamMessageId id2) {
|
|
|
|
|
return readGroupAsync(groupName, consumerName, count, timeout, unit, id, Collections.singletonMap(key2, id2));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id,
|
|
|
|
|
String key2, StreamMessageId id2, String key3, StreamMessageId id3) {
|
|
|
|
|
Map<String, StreamMessageId> params = new HashMap<String, StreamMessageId>(2);
|
|
|
|
|
params.put(key2, id2);
|
|
|
|
|
params.put(key3, id3);
|
|
|
|
|
return readGroupAsync(groupName, consumerName, count, timeout, unit, id, params);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
|
|
|
|
|
return readGroupAsync(groupName, consumerName, 0, timeout, unit, id, keyToId);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
|
|
|
|
|
return get(readGroupAsync(groupName, consumerName, count, timeout, unit, id, keyToId));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
|
|
|
|
|
return readGroup(groupName, consumerName, 0, timeout, unit, id, keyToId);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, StreamMessageId id, String key2, StreamMessageId id2) {
|
|
|
|
|
return readGroupAsync(groupName, consumerName, id, Collections.singletonMap(key2, id2));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, StreamMessageId id, String key2, StreamMessageId id2, String key3,
|
|
|
|
|
StreamMessageId id3) {
|
|
|
|
|
Map<String, StreamMessageId> params = new HashMap<String, StreamMessageId>(2);
|
|
|
|
|
params.put(key2, id2);
|
|
|
|
|
params.put(key3, id3);
|
|
|
|
|
return readGroupAsync(groupName, consumerName, id, params);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, int count, StreamMessageId id, String key2, StreamMessageId id2) {
|
|
|
|
|
return readGroupAsync(groupName, consumerName, count, id, Collections.singletonMap(key2, id2));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, int count, StreamMessageId id, String key2, StreamMessageId id2,
|
|
|
|
|
String key3, StreamMessageId id3) {
|
|
|
|
|
Map<String, StreamMessageId> params = new HashMap<String, StreamMessageId>(2);
|
|
|
|
|
params.put(key2, id2);
|
|
|
|
|
params.put(key3, id3);
|
|
|
|
|
return readGroupAsync(groupName, consumerName, count, id, params);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id,
|
|
|
|
|
String key2, StreamMessageId id2) {
|
|
|
|
|
return readGroupAsync(groupName, consumerName, timeout, unit, id, Collections.singletonMap(key2, id2));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id,
|
|
|
|
|
String key2, StreamMessageId id2, String key3, StreamMessageId id3) {
|
|
|
|
|
Map<String, StreamMessageId> params = new HashMap<String, StreamMessageId>(2);
|
|
|
|
|
params.put(key2, id2);
|
|
|
|
|
params.put(key3, id3);
|
|
|
|
|
return readGroupAsync(groupName, consumerName, timeout, unit, id, params);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, StreamMessageId id, String key2, StreamMessageId id2) {
|
|
|
|
|
return get(readGroupAsync(groupName, consumerName, id, key2, id2));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, StreamMessageId id, String key2, StreamMessageId id2, String key3,
|
|
|
|
|
StreamMessageId id3) {
|
|
|
|
|
return get(readGroupAsync(groupName, consumerName, id, key2, id2, key3, id3));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, StreamMessageId id, String key2, StreamMessageId id2) {
|
|
|
|
|
return get(readGroupAsync(groupName, consumerName, count, id, key2, id2));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, StreamMessageId id, String key2, StreamMessageId id2, String key3,
|
|
|
|
|
StreamMessageId id3) {
|
|
|
|
|
return get(readGroupAsync(groupName, consumerName, count, id, key2, id2, key3, id3));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id, String key2,
|
|
|
|
|
StreamMessageId id2) {
|
|
|
|
|
return get(readGroupAsync(groupName, consumerName, timeout, unit, id, key2, id2));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id, String key2,
|
|
|
|
|
StreamMessageId id2, String key3, StreamMessageId id3) {
|
|
|
|
|
return get(readGroupAsync(groupName, consumerName, timeout, unit, id, key2, id2, key3, id3));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id, String key2,
|
|
|
|
|
StreamMessageId id2) {
|
|
|
|
|
return get(readGroupAsync(groupName, consumerName, count, timeout, unit, id, key2, id2));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id, String key2,
|
|
|
|
|
StreamMessageId id2, String key3, StreamMessageId id3) {
|
|
|
|
|
return get(readGroupAsync(groupName, consumerName, count, timeout, unit, id, key2, id2, key3, id3));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit,
|
|
|
|
|
StreamMessageId id, Map<String, StreamMessageId> keyToId) {
|
|
|
|
|
List<Object> params = new ArrayList<Object>();
|
|
|
|
|
params.add("GROUP");
|
|
|
|
|
params.add(groupName);
|
|
|
|
|
params.add(consumerName);
|
|
|
|
|
|
|
|
|
|
if (count > 0) {
|
|
|
|
|
params.add("COUNT");
|
|
|
|
|
params.add(count);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (timeout > 0) {
|
|
|
|
|
params.add("BLOCK");
|
|
|
|
|
params.add(unit.toMillis(timeout));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
params.add("STREAMS");
|
|
|
|
|
params.add(getRawName());
|
|
|
|
|
params.addAll(keyToId.keySet());
|
|
|
|
|
|
|
|
|
|
if (id == null) {
|
|
|
|
|
params.add(">");
|
|
|
|
|
} else {
|
|
|
|
|
params.add(id);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (StreamMessageId nextId : keyToId.values()) {
|
|
|
|
|
params.add(nextId.toString());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (timeout > 0) {
|
|
|
|
|
return commandExecutor.writeAsync(getRawName(), codec, getServiceManager().getXReadGroupBlockingCommand(), params.toArray());
|
|
|
|
|
}
|
|
|
|
|
return commandExecutor.writeAsync(getRawName(), codec, getServiceManager().getXReadGroupCommand(), params.toArray());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, StreamMessageId... ids) {
|
|
|
|
|
return get(readGroupAsync(groupName, consumerName, ids));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, int count, StreamMessageId... ids) {
|
|
|
|
|
return get(readGroupAsync(groupName, consumerName, count, ids));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId... ids) {
|
|
|
|
|
return get(readGroupAsync(groupName, consumerName, timeout, unit, ids));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit,
|
|
|
|
|
StreamMessageId... ids) {
|
|
|
|
|
return get(readGroupAsync(groupName, consumerName, count, timeout, unit, ids));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public StreamMessageId addAll(Map<K, V> entries) {
|
|
|
|
|
return addAll(entries, 0, false);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<StreamMessageId> addAllAsync(Map<K, V> entries) {
|
|
|
|
|
return addAllAsync(entries, 0, false);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void addAll(StreamMessageId id, Map<K, V> entries) {
|
|
|
|
|
addAll(id, entries, 0, false);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Void> addAllAsync(StreamMessageId id, Map<K, V> entries) {
|
|
|
|
|
return addAllAsync(id, entries, 0, false);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public StreamMessageId addAll(Map<K, V> entries, int trimLen, boolean trimStrict) {
|
|
|
|
|
return get(addAllAsync(entries, trimLen, trimStrict));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<StreamMessageId> addAllAsync(Map<K, V> entries, int trimLen, boolean trimStrict) {
|
|
|
|
|
return addAllCustomAsync(null, entries, trimLen, trimStrict);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void addAll(StreamMessageId id, Map<K, V> entries, int trimLen, boolean trimStrict) {
|
|
|
|
|
get(addAllAsync(id, entries, trimLen, trimStrict));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private <R> RFuture<R> addAllCustomAsync(StreamMessageId id, Map<K, V> entries, int trimLen, boolean trimStrict) {
|
|
|
|
|
List<Object> params = new ArrayList<Object>(entries.size()*2 + 1);
|
|
|
|
|
params.add(getRawName());
|
|
|
|
@ -659,11 +355,6 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
|
|
|
|
|
return commandExecutor.writeAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.XADD_VOID, params.toArray());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Void> addAllAsync(StreamMessageId id, Map<K, V> entries, int trimLen, boolean trimStrict) {
|
|
|
|
|
return addAllCustomAsync(id, entries, trimLen, trimStrict);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public long size() {
|
|
|
|
|
return get(sizeAsync());
|
|
|
|
@ -739,173 +430,6 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
|
|
|
|
|
return commandExecutor.readAsync(getRawName(), codec, getServiceManager().getXReadSingleCommand(), params.toArray());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Map<String, Map<StreamMessageId, Map<K, V>>> read(StreamMessageId id, Map<String, StreamMessageId> keyToId) {
|
|
|
|
|
return read(0, id, keyToId);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(StreamMessageId id, Map<String, StreamMessageId> keyToId) {
|
|
|
|
|
return readAsync(0, id, keyToId);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
|
|
|
|
|
return get(readAsync(count, id, keyToId));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
|
|
|
|
|
return readAsync(count, -1, null, id, keyToId);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id,
|
|
|
|
|
String key2, StreamMessageId id2) {
|
|
|
|
|
return readAsync(count, timeout, unit, id, Collections.singletonMap(key2, id2));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id,
|
|
|
|
|
String key2, StreamMessageId id2, String key3, StreamMessageId id3) {
|
|
|
|
|
Map<String, StreamMessageId> params = new HashMap<String, StreamMessageId>(2);
|
|
|
|
|
params.put(key2, id2);
|
|
|
|
|
params.put(key3, id3);
|
|
|
|
|
return readAsync(count, timeout, unit, id, params);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
|
|
|
|
|
return readAsync(0, timeout, unit, id, keyToId);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
|
|
|
|
|
return get(readAsync(count, timeout, unit, id, keyToId));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Map<String, Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
|
|
|
|
|
return read(0, timeout, unit, id, keyToId);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(StreamMessageId id, String key2, StreamMessageId id2) {
|
|
|
|
|
return readAsync(id, Collections.singletonMap(key2, id2));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(StreamMessageId id, String key2, StreamMessageId id2, String key3,
|
|
|
|
|
StreamMessageId id3) {
|
|
|
|
|
Map<String, StreamMessageId> params = new HashMap<String, StreamMessageId>(2);
|
|
|
|
|
params.put(key2, id2);
|
|
|
|
|
params.put(key3, id3);
|
|
|
|
|
return readAsync(id, params);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, StreamMessageId id, String key2, StreamMessageId id2) {
|
|
|
|
|
return readAsync(count, id, Collections.singletonMap(key2, id2));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, StreamMessageId id, String key2, StreamMessageId id2,
|
|
|
|
|
String key3, StreamMessageId id3) {
|
|
|
|
|
Map<String, StreamMessageId> params = new HashMap<String, StreamMessageId>(2);
|
|
|
|
|
params.put(key2, id2);
|
|
|
|
|
params.put(key3, id3);
|
|
|
|
|
return readAsync(count, id, params);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id,
|
|
|
|
|
String key2, StreamMessageId id2) {
|
|
|
|
|
return readAsync(timeout, unit, id, Collections.singletonMap(key2, id2));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id,
|
|
|
|
|
String key2, StreamMessageId id2, String key3, StreamMessageId id3) {
|
|
|
|
|
Map<String, StreamMessageId> params = new HashMap<String, StreamMessageId>(2);
|
|
|
|
|
params.put(key2, id2);
|
|
|
|
|
params.put(key3, id3);
|
|
|
|
|
return readAsync(timeout, unit, id, params);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Map<String, Map<StreamMessageId, Map<K, V>>> read(StreamMessageId id, String key2, StreamMessageId id2) {
|
|
|
|
|
return get(readAsync(id, key2, id2));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Map<String, Map<StreamMessageId, Map<K, V>>> read(StreamMessageId id, String key2, StreamMessageId id2, String key3,
|
|
|
|
|
StreamMessageId id3) {
|
|
|
|
|
return get(readAsync(id, key2, id2, key3, id3));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId id, String key2, StreamMessageId id2) {
|
|
|
|
|
return get(readAsync(count, id, key2, id2));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId id, String key2, StreamMessageId id2, String key3,
|
|
|
|
|
StreamMessageId id3) {
|
|
|
|
|
return get(readAsync(count, id, key2, id2, key3, id3));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Map<String, Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId id, String key2,
|
|
|
|
|
StreamMessageId id2) {
|
|
|
|
|
return get(readAsync(timeout, unit, id, key2, id2));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Map<String, Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId id, String key2,
|
|
|
|
|
StreamMessageId id2, String key3, StreamMessageId id3) {
|
|
|
|
|
return get(readAsync(timeout, unit, id, key2, id2, key3, id3));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String key2,
|
|
|
|
|
StreamMessageId id2) {
|
|
|
|
|
return get(readAsync(count, timeout, unit, id, key2, id2));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String key2,
|
|
|
|
|
StreamMessageId id2, String key3, StreamMessageId id3) {
|
|
|
|
|
return get(readAsync(count, timeout, unit, id, key2, id2, key3, id3));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
|
|
|
|
|
List<Object> params = new ArrayList<Object>();
|
|
|
|
|
if (count > 0) {
|
|
|
|
|
params.add("COUNT");
|
|
|
|
|
params.add(count);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (timeout > 0) {
|
|
|
|
|
params.add("BLOCK");
|
|
|
|
|
params.add(unit.toMillis(timeout));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
params.add("STREAMS");
|
|
|
|
|
params.add(getRawName());
|
|
|
|
|
params.addAll(keyToId.keySet());
|
|
|
|
|
|
|
|
|
|
params.add(id);
|
|
|
|
|
for (StreamMessageId nextId : keyToId.values()) {
|
|
|
|
|
params.add(nextId.toString());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (timeout > 0) {
|
|
|
|
|
return commandExecutor.readAsync(getRawName(), codec, getServiceManager().getXReadBlockingCommand(), params.toArray());
|
|
|
|
|
}
|
|
|
|
|
return commandExecutor.readAsync(getRawName(), codec, getServiceManager().getXReadCommand(), params.toArray());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public StreamMessageId add(StreamAddArgs<K, V> args) {
|
|
|
|
|
return get(addAsync(args));
|
|
|
|
@ -976,21 +500,6 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
|
|
|
|
|
return commandExecutor.writeAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.XADD_VOID, params.toArray());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<StreamMessageId> addAsync(K key, V value) {
|
|
|
|
|
return addAsync(key, value, 0, false);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Void> addAsync(StreamMessageId id, K key, V value) {
|
|
|
|
|
return addAsync(id, key, value, 0, false);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<StreamMessageId> addAsync(K key, V value, int trimLen, boolean trimStrict) {
|
|
|
|
|
return addCustomAsync(null, key, value, trimLen, trimStrict);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private <R> RFuture<R> addCustomAsync(StreamMessageId id, K key, V value, int trimLen, boolean trimStrict) {
|
|
|
|
|
List<Object> params = new LinkedList<Object>();
|
|
|
|
|
params.add(getRawName());
|
|
|
|
@ -1021,73 +530,6 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
|
|
|
|
|
return commandExecutor.writeAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.XADD_VOID, params.toArray());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Void> addAsync(StreamMessageId id, K key, V value, int trimLen, boolean trimStrict) {
|
|
|
|
|
return addCustomAsync(id, key, value, trimLen, trimStrict);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public StreamMessageId add(K key, V value) {
|
|
|
|
|
return get(addAsync(key, value));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void add(StreamMessageId id, K key, V value) {
|
|
|
|
|
get(addAsync(id, key, value));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public StreamMessageId add(K key, V value, int trimLen, boolean trimStrict) {
|
|
|
|
|
return get(addAsync(key, value, trimLen, trimStrict));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void add(StreamMessageId id, K key, V value, int trimLen, boolean trimStrict) {
|
|
|
|
|
get(addAsync(id, key, value, trimLen, trimStrict));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(int count, StreamMessageId... ids) {
|
|
|
|
|
return readAsync(count, 0, null, ids);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Map<StreamMessageId, Map<K, V>> read(int count, long timeout, TimeUnit unit, StreamMessageId... ids) {
|
|
|
|
|
return get(readAsync(count, timeout, unit, ids));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Map<StreamMessageId, Map<K, V>> read(int count, StreamMessageId... ids) {
|
|
|
|
|
return get(readAsync(count, ids));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(int count, long timeout, TimeUnit unit,
|
|
|
|
|
StreamMessageId... ids) {
|
|
|
|
|
List<Object> params = new ArrayList<Object>();
|
|
|
|
|
if (count > 0) {
|
|
|
|
|
params.add("COUNT");
|
|
|
|
|
params.add(count);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (timeout > 0) {
|
|
|
|
|
params.add("BLOCK");
|
|
|
|
|
params.add(unit.toMillis(timeout));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
params.add("STREAMS");
|
|
|
|
|
params.add(getRawName());
|
|
|
|
|
|
|
|
|
|
for (StreamMessageId id : ids) {
|
|
|
|
|
params.add(id.toString());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (timeout > 0) {
|
|
|
|
|
return commandExecutor.readAsync(getRawName(), codec, getServiceManager().getXReadBlockingSingleCommand(), params.toArray());
|
|
|
|
|
}
|
|
|
|
|
return commandExecutor.readAsync(getRawName(), codec, getServiceManager().getXReadSingleCommand(), params.toArray());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Map<StreamMessageId, Map<K, V>>> rangeAsync(int count, StreamMessageId startId, StreamMessageId endId) {
|
|
|
|
|
List<Object> params = new LinkedList<Object>();
|
|
|
|
@ -1128,16 +570,6 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
|
|
|
|
|
return get(rangeReversedAsync(count, startId, endId));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(StreamMessageId... ids) {
|
|
|
|
|
return readAsync(0, ids);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(long timeout, TimeUnit unit, StreamMessageId... ids) {
|
|
|
|
|
return readAsync(0, timeout, unit, ids);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Map<StreamMessageId, Map<K, V>>> rangeAsync(StreamMessageId startId, StreamMessageId endId) {
|
|
|
|
|
return rangeAsync(0, startId, endId);
|
|
|
|
@ -1148,16 +580,6 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
|
|
|
|
|
return rangeReversedAsync(0, startId, endId);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Map<StreamMessageId, Map<K, V>> read(StreamMessageId... ids) {
|
|
|
|
|
return read(0, ids);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Map<StreamMessageId, Map<K, V>> read(long timeout, TimeUnit unit, StreamMessageId... ids) {
|
|
|
|
|
return read(0, timeout, unit, ids);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Map<StreamMessageId, Map<K, V>> range(StreamMessageId startId, StreamMessageId endId) {
|
|
|
|
|
return range(0, startId, endId);
|
|
|
|
@ -1182,59 +604,6 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
|
|
|
|
|
return get(removeAsync(ids));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public long trim(TrimStrategy strategy, int threshold) {
|
|
|
|
|
return get(trimAsync(strategy, threshold));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public long trimNonStrict(TrimStrategy strategy, int threshold, int limit) {
|
|
|
|
|
return get(trimNonStrictAsync(strategy, threshold, limit));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Long> trimAsync(TrimStrategy strategy, int threshold) {
|
|
|
|
|
return commandExecutor.writeAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.XTRIM,
|
|
|
|
|
getRawName(), strategy.toString(), threshold);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Long> trimNonStrictAsync(TrimStrategy strategy, int threshold, int limit) {
|
|
|
|
|
return commandExecutor.writeAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.XTRIM,
|
|
|
|
|
getRawName(), strategy.toString(), "~", threshold, "LIMIT", limit);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public long trimNonStrict(TrimStrategy strategy, int threshold) {
|
|
|
|
|
return get(trimNonStrictAsync(strategy, threshold));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Long> trimNonStrictAsync(TrimStrategy strategy, int threshold) {
|
|
|
|
|
return commandExecutor.writeAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.XTRIM,
|
|
|
|
|
getRawName(), strategy.toString(), "~", threshold);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Long> trimAsync(int count) {
|
|
|
|
|
return commandExecutor.writeAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.XTRIM, getRawName(), "MAXLEN", count);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Long> trimNonStrictAsync(int count) {
|
|
|
|
|
return commandExecutor.writeAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.XTRIM, getRawName(), "MAXLEN", "~", count);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public long trim(int count) {
|
|
|
|
|
return get(trimAsync(count));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public long trimNonStrict(int count) {
|
|
|
|
|
return get(trimNonStrictAsync(count));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Void> removeGroupAsync(String groupName) {
|
|
|
|
|
return commandExecutor.writeAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.XGROUP, "DESTROY", getRawName(), groupName);
|
|
|
|
|