Improvement - simplify read() and readGroup() methods of RStream object #3471

pull/3094/merge
Nikita Koksharov 4 years ago
parent 57a6ecf558
commit 6579a85427

@ -19,10 +19,7 @@ import java.util.*;
import java.util.concurrent.TimeUnit;
import org.redisson.api.*;
import org.redisson.api.stream.StreamAddArgs;
import org.redisson.api.stream.StreamAddArgsSource;
import org.redisson.api.stream.StreamAddParams;
import org.redisson.api.stream.TrimStrategy;
import org.redisson.api.stream.*;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
@ -202,6 +199,97 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
return get(fastAutoClaimAsync(groupName, consumerName, idleTime, idleTimeUnit, startId, count));
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, StreamMultiReadGroupArgs args) {
return get(readGroupAsync(groupName, consumerName, args));
}
@Override
public Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, StreamReadGroupArgs args) {
return get(readGroupAsync(groupName, consumerName, args));
}
@Override
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, StreamMultiReadGroupArgs args) {
StreamReadGroupParams rp = ((StreamReadGroupSource) args).getParams();
List<Object> params = new ArrayList<>();
params.add("GROUP");
params.add(groupName);
params.add(consumerName);
if (rp.getCount() > 0) {
params.add("COUNT");
params.add(rp.getCount());
}
if (rp.getTimeout() != null) {
params.add("BLOCK");
params.add(toSeconds(rp.getTimeout().getSeconds(), TimeUnit.SECONDS)*1000);
}
if (rp.isNoAck()) {
params.add("NOACK");
}
params.add("STREAMS");
params.add(getName());
params.addAll(rp.getOffsets().keySet());
if (rp.getId1() == null) {
params.add(">");
} else {
params.add(rp.getId1().toString());
}
for (StreamMessageId nextId : rp.getOffsets().values()) {
params.add(nextId.toString());
}
if (rp.getTimeout() != null) {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.XREADGROUP_BLOCKING, params.toArray());
}
return commandExecutor.writeAsync(getName(), codec, RedisCommands.XREADGROUP, params.toArray());
}
@Override
public RFuture<Map<StreamMessageId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, StreamReadGroupArgs args) {
StreamReadGroupParams rp = ((StreamReadGroupSource) args).getParams();
List<Object> params = new ArrayList<>();
params.add("GROUP");
params.add(groupName);
params.add(consumerName);
if (rp.getCount() > 0) {
params.add("COUNT");
params.add(rp.getCount());
}
if (rp.getTimeout() != null) {
params.add("BLOCK");
params.add(toSeconds(rp.getTimeout().getSeconds(), TimeUnit.SECONDS)*1000);
}
if (rp.isNoAck()) {
params.add("NOACK");
}
params.add("STREAMS");
params.add(getName());
if (rp.getId1() == null) {
params.add(">");
} else {
params.add(rp.getId1().toString());
}
if (rp.getTimeout() != null) {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.XREADGROUP_BLOCKING_SINGLE, params.toArray());
}
return commandExecutor.writeAsync(getName(), codec, RedisCommands.XREADGROUP_SINGLE, params.toArray());
}
@Override
public RFuture<FastAutoClaimResult> fastAutoClaimAsync(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId startId, int count) {
List<Object> params = new ArrayList<>();
@ -569,7 +657,72 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
public RFuture<Long> sizeAsync() {
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XLEN, getName());
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> read(StreamMultiReadArgs args) {
return get(readAsync(args));
}
@Override
public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(StreamMultiReadArgs args) {
StreamReadParams rp = ((StreamReadSource) args).getParams();
List<Object> params = new ArrayList<>();
if (rp.getCount() > 0) {
params.add("COUNT");
params.add(rp.getCount());
}
if (rp.getTimeout() != null) {
params.add("BLOCK");
params.add(toSeconds(rp.getTimeout().getSeconds(), TimeUnit.SECONDS)*1000);
}
params.add("STREAMS");
params.add(getName());
params.addAll(rp.getOffsets().keySet());
params.add(rp.getId1());
for (StreamMessageId nextId : rp.getOffsets().values()) {
params.add(nextId.toString());
}
if (rp.getTimeout() != null) {
return commandExecutor.readAsync(getName(), codec, RedisCommands.XREAD_BLOCKING, params.toArray());
}
return commandExecutor.readAsync(getName(), codec, RedisCommands.XREAD, params.toArray());
}
@Override
public Map<StreamMessageId, Map<K, V>> read(StreamReadArgs args) {
return get(readAsync(args));
}
@Override
public RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(StreamReadArgs args) {
StreamReadParams rp = ((StreamReadSource) args).getParams();
List<Object> params = new ArrayList<Object>();
if (rp.getCount() > 0) {
params.add("COUNT");
params.add(rp.getCount());
}
if (rp.getTimeout() != null) {
params.add("BLOCK");
params.add(toSeconds(rp.getTimeout().getSeconds(), TimeUnit.SECONDS)*1000);
}
params.add("STREAMS");
params.add(getName());
params.add(rp.getId1());
if (rp.getTimeout() != null) {
return commandExecutor.readAsync(getName(), codec, RedisCommands.XREAD_BLOCKING_SINGLE, params.toArray());
}
return commandExecutor.readAsync(getName(), codec, RedisCommands.XREAD_SINGLE, params.toArray());
}
@Override
public Map<String, Map<StreamMessageId, Map<K, V>>> read(StreamMessageId id, Map<String, StreamMessageId> keyToId) {
return read(0, id, keyToId);

@ -15,8 +15,7 @@
*/
package org.redisson.api;
import org.redisson.api.stream.StreamAddArgs;
import org.redisson.api.stream.TrimStrategy;
import org.redisson.api.stream.*;
import java.util.List;
import java.util.Map;
@ -320,228 +319,136 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
FastAutoClaimResult fastAutoClaim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId startId, int count);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream Message IDs.
* Read stream data from consumer group and multiple streams including current.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param ids - collection of Stream Message IDs
* @return stream data mapped by Stream Message ID
* @param args - method arguments object
* @return stream data mapped by stream name and Stream Message ID
*/
Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, StreamMessageId... ids);
Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, StreamMultiReadGroupArgs args);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream Message IDs.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param ids - collection of Stream Message IDs
* Read stream data from consumer group and current stream only.
*
* @param args - method arguments object
* @return stream data mapped by Stream Message ID
*/
Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, StreamReadGroupArgs args);
/*
* Use readGroup(String, String, StreamReadGroupArgs) method instead
*
*/
@Deprecated
Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, StreamMessageId... ids);
/*
* Use readGroup(String, String, StreamReadGroupArgs) method instead
*
*/
@Deprecated
Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, int count, StreamMessageId... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream Message IDs.
* Waits for stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param ids - collection of Stream Message IDs
* @return stream data mapped by Stream Message ID
/*
* Use readGroup(String, String, StreamReadGroupArgs) method instead
*
*/
@Deprecated
Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream Message IDs.
* Waits for stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param ids - collection of Stream Message IDs
* @return stream data mapped by Stream Message ID
/*
* Use readGroup(String, String, StreamReadGroupArgs) method instead
*
*/
@Deprecated
Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param id - starting message id for this stream
* @param nameToId - Stream Message ID mapped by stream name
* @return stream data mapped by key and Stream Message ID
/*
* Use readGroup(String, String, StreamMultiReadGroupArgs) method instead
*
*/
@Deprecated
Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param id - starting message id for this stream
* @param nameToId - Stream Message ID mapped by stream name
* @return stream data mapped by key and Stream Message ID
/*
* Use readGroup(String, String, StreamMultiReadGroupArgs) method instead
*
*/
@Deprecated
Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
* Waits for the first stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - starting message id for this stream
* @param nameToId - Stream Message ID mapped by stream name
* @return stream data mapped by key and Stream Message ID
/*
* Use readGroup(String, String, StreamMultiReadGroupArgs) method instead
*
*/
@Deprecated
Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
* Waits for the first stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - starting message id for this stream
* @param nameToId - Stream Message ID mapped by stream name
* @return stream data mapped by key and Stream Message ID
/*
* Use readGroup(String, String, StreamMultiReadGroupArgs) method instead
*
*/
@Deprecated
Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @return stream data mapped by key and Stream Message ID
/*
* Use readGroup(String, String, StreamMultiReadGroupArgs) method instead
*
*/
@Deprecated
Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, StreamMessageId id, String key2, StreamMessageId id2);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @param key3 - name of third stream
* @param id3 - starting message id for third stream
* @return stream data mapped by key and Stream Message ID
/*
* Use readGroup(String, String, StreamMultiReadGroupArgs) method instead
*
*/
@Deprecated
Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, StreamMessageId id, String key2, StreamMessageId id2, String key3,
StreamMessageId id3);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @return stream data mapped by key and Stream Message ID
/*
* Use readGroup(String, String, StreamMultiReadGroupArgs) method instead
*
*/
@Deprecated
Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, StreamMessageId id, String key2, StreamMessageId id2);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @param key3 - name of third stream
* @param id3 - starting message id for third stream
* @return stream data mapped by key and Stream Message ID
/*
* Use readGroup(String, String, StreamMultiReadGroupArgs) method instead
*
*/
@Deprecated
Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, StreamMessageId id, String key2, StreamMessageId id2, String key3,
StreamMessageId id3);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
* Waits for the first stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @return stream data mapped by key and Stream Message ID
/*
* Use readGroup(String, String, StreamMultiReadGroupArgs) method instead
*
*/
@Deprecated
Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id, String key2,
StreamMessageId id2);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
* Waits for the first stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @param key3 - name of third stream
* @param id3 - starting message id for third stream
* @return stream data mapped by key and Stream Message ID
/*
* Use readGroup(String, String, StreamMultiReadGroupArgs) method instead
*
*/
@Deprecated
Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id, String key2,
StreamMessageId id2, String key3, StreamMessageId id3);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
* Waits for the first stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @return stream data mapped by key and Stream Message ID
/*
* Use readGroup(String, String, StreamMultiReadGroupArgs) method instead
*
*/
@Deprecated
Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id, String key2,
StreamMessageId id2);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
* Waits for the first stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @param key3 - name of third stream
* @param id3 - starting message id for third stream
* @return stream data mapped by key and Stream Message ID
/*
* Use readGroup(String, String, StreamMultiReadGroupArgs) method instead
*
*/
@Deprecated
Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id, String key2,
StreamMessageId id2, String key3, StreamMessageId id3);
@ -623,193 +530,133 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
*/
@Deprecated
void addAll(StreamMessageId id, Map<K, V> entries, int trimLen, boolean trimStrict);
/**
* Read stream data by specified collection of Stream Message IDs.
*
* @param ids - collection of Stream Message IDs
* @return stream data mapped by Stream Message ID
* Read stream data from multiple streams including current.
*
* @param args - method arguments object
* @return stream data mapped by stream name and Stream Message ID
*/
Map<StreamMessageId, Map<K, V>> read(StreamMessageId... ids);
Map<String, Map<StreamMessageId, Map<K, V>>> read(StreamMultiReadArgs args);
/**
* Read stream data by specified collection of Stream Message IDs.
*
* @param count - stream data size limit
* @param ids - collection of Stream Message IDs
* Read stream data from current stream only.
*
* @param args - method arguments object
* @return stream data mapped by Stream Message ID
*/
Map<StreamMessageId, Map<K, V>> read(StreamReadArgs args);
/*
* Use read(StreamReadArgs) method instead
*
*/
@Deprecated
Map<StreamMessageId, Map<K, V>> read(StreamMessageId... ids);
/*
* Use read(StreamReadArgs) method instead
*
*/
@Deprecated
Map<StreamMessageId, Map<K, V>> read(int count, StreamMessageId... ids);
/**
* Read stream data by specified collection of Stream Message IDs.
* Wait for stream data availability for specified <code>timeout</code> interval.
*
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param ids - collection of Stream Message IDs
* @return stream data mapped by Stream Message ID
/*
* Use read(StreamReadArgs) method instead
*
*/
@Deprecated
Map<StreamMessageId, Map<K, V>> read(long timeout, TimeUnit unit, StreamMessageId... ids);
/**
* Read stream data by specified collection of Stream Message IDs.
* Wait for stream data availability for specified <code>timeout</code> interval.
*
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param ids - collection of Stream Message IDs
* @return stream data mapped by Stream Message ID
/*
* Use read(StreamReadArgs) method instead
*
*/
@Deprecated
Map<StreamMessageId, Map<K, V>> read(int count, long timeout, TimeUnit unit, StreamMessageId... ids);
/**
* Read stream data by specified stream name including this stream.
*
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @return stream data mapped by key and Stream Message ID
/*
* Use read(StreamMultiReadArgs) method instead
*
*/
@Deprecated
Map<String, Map<StreamMessageId, Map<K, V>>> read(StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
*
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @param name3 - name of third stream
* @param id3 - id of third stream
* @return stream data mapped by key and Stream Message ID
/*
* Use read(StreamMultiReadArgs) method instead
*
*/
@Deprecated
Map<String, Map<StreamMessageId, Map<K, V>>> read(StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified Stream Message ID mapped by name including this stream.
*
* @param id - id of this stream
* @param nameToId - Stream Message ID mapped by name
* @return stream data mapped by key and Stream Message ID
/*
* Use read(StreamMultiReadArgs) method instead
*
*/
@Deprecated
Map<String, Map<StreamMessageId, Map<K, V>>> read(StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data by specified stream name including this stream.
*
* @param count - stream data size limit
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @return stream data mapped by key and Stream Message ID
/*
* Use read(StreamMultiReadArgs) method instead
*
*/
@Deprecated
Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
*
* @param count - stream data size limit
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @param name3 - name of third stream
* @param id3 - id of third stream
* @return stream data mapped by key and Stream Message ID
/*
* Use read(StreamMultiReadArgs) method instead
*
*/
@Deprecated
Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified Stream Message ID mapped by name including this stream.
*
* @param count - stream data size limit
* @param id - id of this stream
* @param nameToId - Stream Message ID mapped by name
* @return stream data mapped by key and Stream Message ID
/*
* Use read(StreamReadArgs) method instead
*
*/
@Deprecated
Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data by specified stream name including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @return stream data mapped by key and Stream Message ID
/*
* Use read(StreamMultiReadArgs) method instead
*
*/
@Deprecated
Map<String, Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @param name3 - name of third stream
* @param id3 - id of third stream
* @return stream data mapped by key and Stream Message ID
/*
* Use read(StreamMultiReadArgs) method instead
*
*/
@Deprecated
Map<String, Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified Stream Message ID mapped by name including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param nameToId - Stream Message ID mapped by name
* @return stream data mapped by key and Stream Message ID
/*
* Use read(StreamMultiReadArgs) method instead
*
*/
@Deprecated
Map<String, Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data by specified stream name including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @return stream data mapped by key and Stream Message ID
/*
* Use read(StreamMultiReadArgs) method instead
*
*/
@Deprecated
Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @param name3 - name of third stream
* @param id3 - id of third stream
* @return stream data mapped by key and Stream Message ID
/*
* Use read(StreamMultiReadArgs) method instead
*
*/
@Deprecated
Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified Stream Message ID mapped by name including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param nameToId - Stream Message ID mapped by name
* @return stream data mapped by key and Stream Message ID
/*
* Use read(StreamMultiReadArgs) method instead
*
*/
@Deprecated
Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**

@ -15,8 +15,7 @@
*/
package org.redisson.api;
import org.redisson.api.stream.StreamAddArgs;
import org.redisson.api.stream.TrimStrategy;
import org.redisson.api.stream.*;
import java.util.List;
import java.util.Map;
@ -320,7 +319,23 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @return list of Stream Message IDs
*/
RFuture<List<StreamMessageId>> fastClaimAsync(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId... ids);
/**
* Read stream data from consumer group and multiple streams including current.
*
* @param args - method arguments object
* @return stream data mapped by stream name and Stream Message ID
*/
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, StreamMultiReadGroupArgs args);
/**
* Read stream data from consumer group and current stream only.
*
* @param args - method arguments object
* @return stream data mapped by Stream Message ID
*/
RFuture<Map<StreamMessageId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, StreamReadGroupArgs args);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
*
@ -609,193 +624,133 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
*/
@Deprecated
RFuture<Void> addAllAsync(StreamMessageId id, Map<K, V> entries, int trimLen, boolean trimStrict);
/**
* Read stream data by specified collection of Stream IDs.
*
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
* Read stream data from multiple streams including current.
*
* @param args - method arguments object
* @return stream data mapped by stream name and Stream Message ID
*/
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(StreamMultiReadArgs args);
/**
* Read stream data from current stream only.
*
* @param args - method arguments object
* @return stream data mapped by Stream Message ID
*/
RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(StreamReadArgs args);
/*
* Use readAsync(StreamReadArgs) method instead
*
*/
@Deprecated
RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(StreamMessageId... ids);
/**
* Read stream data by specified collection of Stream IDs.
*
* @param count - stream data size limit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
/*
* Use readAsync(StreamReadArgs) method instead
*
*/
@Deprecated
RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(int count, StreamMessageId... ids);
/**
* Read stream data by specified collection of Stream IDs.
* Wait for stream data availability for specified <code>timeout</code> interval.
*
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
/*
* Use readAsync(StreamReadArgs) method instead
*
*/
@Deprecated
RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(long timeout, TimeUnit unit, StreamMessageId... ids);
/**
* Read stream data by specified collection of Stream IDs.
* Wait for stream data availability for specified <code>timeout</code> interval.
*
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
/*
* Use readAsync(StreamReadArgs) method instead
*
*/
@Deprecated
RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId... ids);
/**
* Read stream data by specified stream name including this stream.
*
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
/*
* Use readAsync(StreamMultiReadArgs) method instead
*
*/
@Deprecated
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
*
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @param name3 - name of third stream
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
/*
* Use readAsync(StreamMultiReadArgs) method instead
*
*/
@Deprecated
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
*
* @param id - id of this stream
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
/*
* Use readAsync(StreamMultiReadArgs) method instead
*
*/
@Deprecated
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data by specified stream name including this stream.
*
* @param count - stream data size limit
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
/*
* Use readAsync(StreamMultiReadArgs) method instead
*
*/
@Deprecated
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
*
* @param count - stream data size limit
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @param name3 - name of third stream
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
/*
* Use readAsync(StreamMultiReadArgs) method instead
*
*/
@Deprecated
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
*
* @param count - stream data size limit
* @param id - id of this stream
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
/*
* Use readAsync(StreamMultiReadArgs) method instead
*
*/
@Deprecated
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data by specified stream name including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
/*
* Use readAsync(StreamMultiReadArgs) method instead
*
*/
@Deprecated
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @param name3 - name of third stream
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
/*
* Use readAsync(StreamMultiReadArgs) method instead
*
*/
@Deprecated
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
/*
* Use readAsync(StreamMultiReadArgs) method instead
*
*/
@Deprecated
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data by specified stream name including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
/*
* Use readAsync(StreamMultiReadArgs) method instead
*
*/
@Deprecated
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @param name3 - name of third stream
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
/*
* Use readAsync(StreamMultiReadArgs) method instead
*
*/
@Deprecated
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
/*
* Use readAsync(StreamMultiReadArgs) method instead
*
*/
@Deprecated
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**

@ -19,8 +19,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.redisson.api.stream.StreamAddArgs;
import org.redisson.api.stream.TrimStrategy;
import org.redisson.api.stream.*;
import reactor.core.publisher.Mono;
/**
@ -271,212 +270,128 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
Mono<FastAutoClaimResult> fastAutoClaim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId startId, int count);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
* Read stream data from consumer group and multiple streams including current.
*
* @param args - method arguments object
* @return stream data mapped by stream name and Stream Message ID
*/
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroup(String groupName, String consumerName, StreamMultiReadGroupArgs args);
/**
* Read stream data from consumer group and current stream only.
*
* @param args - method arguments object
* @return stream data mapped by Stream Message ID
*/
Mono<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, StreamReadGroupArgs args);
/*
* Use readGroup(String, String, StreamReadGroupArgs) method instead
*
*/
@Deprecated
Mono<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, StreamMessageId... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
*
* @param groupName - name of group
* @param consumerName - consumer name
* @param count - stream data size limit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
/*
* Use readGroup(String, String, StreamReadGroupArgs) method instead
*
*/
@Deprecated
Mono<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, StreamMessageId... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
* Wait for stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - consumer name
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
/*
* Use readGroup(String, String, StreamReadGroupArgs) method instead
*
*/
@Deprecated
Mono<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
* Wait for stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - consumer name
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
/*
* Use readGroup(String, String, StreamReadGroupArgs) method instead
*
*/
@Deprecated
Mono<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param id - starting message id for this stream
* @param nameToId - Stream Message ID mapped by stream name
* @return stream data mapped by key and Stream Message ID
/*
* Use readGroup(String, String, StreamMultiReadGroupArgs) method instead
*
*/
@Deprecated
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroup(String groupName, String consumerName, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param id - starting message id for this stream
* @param nameToId - Stream Message ID mapped by stream name
* @return stream data mapped by key and Stream Message ID
/*
* Use readGroup(String, String, StreamMultiReadGroupArgs) method instead
*
*/
@Deprecated
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroup(String groupName, String consumerName, int count, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
* Waits for the first stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @return stream data mapped by key and Stream Message ID
/*
* Use readGroup(String, String, StreamMultiReadGroupArgs) method instead
*
*/
@Deprecated
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id, String key2, StreamMessageId id2);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
* Waits for the first stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @param key3 - name of third stream
* @param id3 - starting message id for third stream
* @return stream data mapped by key and Stream Message ID
/*
* Use readGroup(String, String, StreamMultiReadGroupArgs) method instead
*
*/
@Deprecated
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id,
String key2, StreamMessageId id2, String key3, StreamMessageId id3);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
* Waits for the first stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - starting message id for this stream
* @param nameToId - Stream Message ID mapped by stream name
* @return stream data mapped by key and Stream Message ID
/*
* Use readGroup(String, String, StreamMultiReadGroupArgs) method instead
*
*/
@Deprecated
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @return stream data mapped by key and Stream Message ID
/*
* Use readGroup(String, String, StreamMultiReadGroupArgs) method instead
*
*/
@Deprecated
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroup(String groupName, String consumerName, StreamMessageId id, String key2, StreamMessageId id2);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @param key3 - name of third stream
* @param id3 - starting message id for third stream
* @return stream data mapped by key and Stream Message ID
/*
* Use readGroup(String, String, StreamMultiReadGroupArgs) method instead
*
*/
@Deprecated
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroup(String groupName, String consumerName, StreamMessageId id, String key2, StreamMessageId id2, String key3,
StreamMessageId id3);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @return stream data mapped by key and Stream Message ID
/*
* Use readGroup(String, String, StreamMultiReadGroupArgs) method instead
*
*/
@Deprecated
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroup(String groupName, String consumerName, int count, StreamMessageId id, String key2, StreamMessageId id2);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @param key3 - name of third stream
* @param id3 - starting message id for third stream
* @return stream data mapped by key and Stream Message ID
/*
* Use readGroup(String, String, StreamMultiReadGroupArgs) method instead
*
*/
@Deprecated
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroup(String groupName, String consumerName, int count, StreamMessageId id, String key2, StreamMessageId id2,
String key3, StreamMessageId id3);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
* Waits for the first stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @return stream data mapped by key and Stream Message ID
/*
* Use readGroup(String, String, StreamMultiReadGroupArgs) method instead
*
*/
@Deprecated
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id,
String key2, StreamMessageId id2);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
* Waits for the first stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @param key3 - name of third stream
* @param id3 - starting message id for third stream
* @return stream data mapped by key and Stream Message ID
/*
* Use readGroup(String, String, StreamMultiReadGroupArgs) method instead
*
*/
@Deprecated
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id,
String key2, StreamMessageId id2, String key3, StreamMessageId id3);
@ -558,193 +473,133 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
*/
@Deprecated
Mono<Void> addAll(StreamMessageId id, Map<K, V> entries, int trimLen, boolean trimStrict);
/**
* Read stream data by specified collection of Stream IDs.
*
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
* Read stream data from multiple streams including current.
*
* @param args - method arguments object
* @return stream data mapped by stream name and Stream Message ID
*/
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> read(StreamMultiReadArgs args);
/**
* Read stream data from current stream only.
*
* @param args - method arguments object
* @return stream data mapped by Stream Message ID
*/
Mono<Map<StreamMessageId, Map<K, V>>> read(StreamReadArgs args);
/*
* Use read(StreamReadArgs) method instead
*
*/
@Deprecated
Mono<Map<StreamMessageId, Map<K, V>>> read(StreamMessageId... ids);
/**
* Read stream data by specified collection of Stream IDs.
*
* @param count - stream data size limit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
/*
* Use read(StreamReadArgs) method instead
*
*/
@Deprecated
Mono<Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId... ids);
/**
* Read stream data by specified collection of Stream IDs.
* Wait for stream data availability for specified <code>timeout</code> interval.
*
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
/*
* Use read(StreamReadArgs) method instead
*
*/
@Deprecated
Mono<Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId... ids);
/**
* Read stream data by specified collection of Stream IDs.
* Wait for stream data availability for specified <code>timeout</code> interval.
*
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
/*
* Use read(StreamReadArgs) method instead
*
*/
@Deprecated
Mono<Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId... ids);
/**
* Read stream data by specified stream name including this stream.
*
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
/*
* Use read(StreamMultiReadArgs) method instead
*
*/
@Deprecated
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> read(StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
*
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @param name3 - name of third stream
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
/*
* Use read(StreamMultiReadArgs) method instead
*
*/
@Deprecated
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> read(StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
*
* @param id - id of this stream
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
/*
* Use read(StreamMultiReadArgs) method instead
*
*/
@Deprecated
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> read(StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data by specified stream name including this stream.
*
* @param count - stream data size limit
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
/*
* Use read(StreamMultiReadArgs) method instead
*
*/
@Deprecated
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> read(int count, StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
*
* @param count - stream data size limit
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @param name3 - name of third stream
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
/*
* Use read(StreamMultiReadArgs) method instead
*
*/
@Deprecated
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> read(int count, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
*
* @param count - stream data size limit
* @param id - id of this stream
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
/*
* Use read(StreamMultiReadArgs) method instead
*
*/
@Deprecated
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> read(int count, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data by specified stream name including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
/*
* Use read(StreamMultiReadArgs) method instead
*
*/
@Deprecated
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> read(long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @param name3 - name of third stream
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
/*
* Use read(StreamMultiReadArgs) method instead
*
*/
@Deprecated
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> read(long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
/*
* Use read(StreamMultiReadArgs) method instead
*
*/
@Deprecated
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> read(long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data by specified stream name including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
/*
* Use read(StreamMultiReadArgs) method instead
*
*/
@Deprecated
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @param name3 - name of third stream
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
/*
* Use read(StreamMultiReadArgs) method instead
*
*/
@Deprecated
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
/*
* Use read(StreamMultiReadArgs) method instead
*
*/
@Deprecated
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**

@ -21,8 +21,7 @@ import java.util.concurrent.TimeUnit;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Single;
import org.redisson.api.stream.StreamAddArgs;
import org.redisson.api.stream.TrimStrategy;
import org.redisson.api.stream.*;
/**
* Reactive interface for Redis Stream object.
@ -272,212 +271,128 @@ public interface RStreamRx<K, V> extends RExpirableRx {
Single<FastAutoClaimResult> fastAutoClaim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId startId, int count);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
* Read stream data from consumer group and multiple streams including current.
*
* @param args - method arguments object
* @return stream data mapped by stream name and Stream Message ID
*/
Single<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroup(String groupName, String consumerName, StreamMultiReadGroupArgs args);
/**
* Read stream data from consumer group and current stream only.
*
* @param args - method arguments object
* @return stream data mapped by Stream Message ID
*/
Single<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, StreamReadGroupArgs args);
/*
* Use readGroup(String, String, StreamReadGroupArgs) method instead
*
*/
@Deprecated
Single<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, StreamMessageId... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
/*
* Use readGroup(String, String, StreamReadGroupArgs) method instead
*
*/
@Deprecated
Single<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, StreamMessageId... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
* Wait for stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
/*
* Use readGroup(String, String, StreamReadGroupArgs) method instead
*
*/
@Deprecated
Single<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
* Wait for stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
/*
* Use readGroup(String, String, StreamReadGroupArgs) method instead
*
*/
@Deprecated
Single<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param id - starting message id for this stream
* @param nameToId - Stream Message ID mapped by stream name
* @return stream data mapped by key and Stream Message ID
/*
* Use readGroup(String, String, StreamMultiReadGroupArgs) method instead
*
*/
@Deprecated
Single<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroup(String groupName, String consumerName, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param id - starting message id for this stream
* @param nameToId - Stream Message ID mapped by stream name
* @return stream data mapped by key and Stream Message ID
/*
* Use readGroup(String, String, StreamMultiReadGroupArgs) method instead
*
*/
@Deprecated
Single<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroup(String groupName, String consumerName, int count, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
* Waits for the first stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @return stream data mapped by key and Stream Message ID
/*
* Use readGroup(String, String, StreamMultiReadGroupArgs) method instead
*
*/
@Deprecated
Single<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id, String key2, StreamMessageId id2);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
* Waits for the first stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @param key3 - name of third stream
* @param id3 - starting message id for third stream
* @return stream data mapped by key and Stream Message ID
/*
* Use readGroup(String, String, StreamMultiReadGroupArgs) method instead
*
*/
@Deprecated
Single<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id,
String key2, StreamMessageId id2, String key3, StreamMessageId id3);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
* Waits for the first stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - starting message id for this stream
* @param nameToId - Stream Message ID mapped by stream name
* @return stream data mapped by key and Stream Message ID
/*
* Use readGroup(String, String, StreamMultiReadGroupArgs) method instead
*
*/
@Deprecated
Single<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @return stream data mapped by key and Stream Message ID
/*
* Use readGroup(String, String, StreamMultiReadGroupArgs) method instead
*
*/
@Deprecated
Single<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroup(String groupName, String consumerName, StreamMessageId id, String key2, StreamMessageId id2);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @param key3 - name of third stream
* @param id3 - starting message id for third stream
* @return stream data mapped by key and Stream Message ID
/*
* Use readGroup(String, String, StreamMultiReadGroupArgs) method instead
*
*/
@Deprecated
Single<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroup(String groupName, String consumerName, StreamMessageId id, String key2, StreamMessageId id2, String key3,
StreamMessageId id3);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @return stream data mapped by key and Stream Message ID
/*
* Use readGroup(String, String, StreamMultiReadGroupArgs) method instead
*
*/
@Deprecated
Single<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroup(String groupName, String consumerName, int count, StreamMessageId id, String key2, StreamMessageId id2);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @param key3 - name of third stream
* @param id3 - starting message id for third stream
* @return stream data mapped by key and Stream Message ID
/*
* Use readGroup(String, String, StreamMultiReadGroupArgs) method instead
*
*/
@Deprecated
Single<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroup(String groupName, String consumerName, int count, StreamMessageId id, String key2, StreamMessageId id2,
String key3, StreamMessageId id3);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
* Waits for the first stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @return stream data mapped by key and Stream Message ID
/*
* Use readGroup(String, String, StreamMultiReadGroupArgs) method instead
*
*/
@Deprecated
Single<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id,
String key2, StreamMessageId id2);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
* Waits for the first stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - starting message id for this stream
* @param key2 - name of second stream
* @param id2 - starting message id for second stream
* @param key3 - name of third stream
* @param id3 - starting message id for third stream
* @return stream data mapped by key and Stream Message ID
/*
* Use readGroup(String, String, StreamMultiReadGroupArgs) method instead
*
*/
@Deprecated
Single<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id,
String key2, StreamMessageId id2, String key3, StreamMessageId id3);
@ -559,193 +474,133 @@ public interface RStreamRx<K, V> extends RExpirableRx {
*/
@Deprecated
Completable addAll(StreamMessageId id, Map<K, V> entries, int trimLen, boolean trimStrict);
/**
* Read stream data by specified collection of Stream IDs.
*
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
* Read stream data from multiple streams including current.
*
* @param args - method arguments object
* @return stream data mapped by stream name and Stream Message ID
*/
Single<Map<String, Map<StreamMessageId, Map<K, V>>>> read(StreamMultiReadArgs args);
/**
* Read stream data from current stream only.
*
* @param args - method arguments object
* @return stream data mapped by Stream Message ID
*/
Single<Map<StreamMessageId, Map<K, V>>> read(StreamReadArgs args);
/*
* Use read(StreamReadArgs) method instead
*
*/
@Deprecated
Single<Map<StreamMessageId, Map<K, V>>> read(StreamMessageId... ids);
/**
* Read stream data by specified collection of Stream IDs.
*
* @param count - stream data size limit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
/*
* Use read(StreamReadArgs) method instead
*
*/
@Deprecated
Single<Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId... ids);
/**
* Read stream data by specified collection of Stream IDs.
* Wait for stream data availability for specified <code>timeout</code> interval.
*
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
/*
* Use read(StreamReadArgs) method instead
*
*/
@Deprecated
Single<Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId... ids);
/**
* Read stream data by specified collection of Stream IDs.
* Wait for stream data availability for specified <code>timeout</code> interval.
*
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
/*
* Use read(StreamReadArgs) method instead
*
*/
@Deprecated
Single<Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId... ids);
/**
* Read stream data by specified stream name including this stream.
*
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
/*
* Use read(StreamMultiReadArgs) method instead
*
*/
@Deprecated
Single<Map<String, Map<StreamMessageId, Map<K, V>>>> read(StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
*
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @param name3 - name of third stream
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
/*
* Use read(StreamMultiReadArgs) method instead
*
*/
@Deprecated
Single<Map<String, Map<StreamMessageId, Map<K, V>>>> read(StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
*
* @param id - id of this stream
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
/*
* Use read(StreamMultiReadArgs) method instead
*
*/
@Deprecated
Single<Map<String, Map<StreamMessageId, Map<K, V>>>> read(StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data by specified stream name including this stream.
*
* @param count - stream data size limit
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
/*
* Use read(StreamMultiReadArgs) method instead
*
*/
@Deprecated
Single<Map<String, Map<StreamMessageId, Map<K, V>>>> read(int count, StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
*
* @param count - stream data size limit
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @param name3 - name of third stream
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
/*
* Use read(StreamMultiReadArgs) method instead
*
*/
@Deprecated
Single<Map<String, Map<StreamMessageId, Map<K, V>>>> read(int count, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
*
* @param count - stream data size limit
* @param id - id of this stream
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
/*
* Use read(StreamMultiReadArgs) method instead
*
*/
@Deprecated
Single<Map<String, Map<StreamMessageId, Map<K, V>>>> read(int count, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data by specified stream name including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
/*
* Use read(StreamMultiReadArgs) method instead
*
*/
@Deprecated
Single<Map<String, Map<StreamMessageId, Map<K, V>>>> read(long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @param name3 - name of third stream
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
/*
* Use read(StreamMultiReadArgs) method instead
*
*/
@Deprecated
Single<Map<String, Map<StreamMessageId, Map<K, V>>>> read(long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
/*
* Use read(StreamMultiReadArgs) method instead
*
*/
@Deprecated
Single<Map<String, Map<StreamMessageId, Map<K, V>>>> read(long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**
* Read stream data by specified stream name including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
/*
* Use read(StreamMultiReadArgs) method instead
*
*/
@Deprecated
Single<Map<String, Map<StreamMessageId, Map<K, V>>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2);
/**
* Read stream data by specified stream names including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @param name3 - name of third stream
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
/*
* Use read(StreamMultiReadArgs) method instead
*
*/
@Deprecated
Single<Map<String, Map<StreamMessageId, Map<K, V>>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String name2, StreamMessageId id2, String name3, StreamMessageId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
/*
* Use read(StreamMultiReadArgs) method instead
*
*/
@Deprecated
Single<Map<String, Map<StreamMessageId, Map<K, V>>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/**

@ -0,0 +1,52 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.stream;
import org.redisson.api.StreamMessageId;
import java.time.Duration;
import java.util.Map;
/**
*
* @author Nikita Koksharov
*
*/
class BaseStreamMultiReadArgs implements StreamMultiReadArgs, StreamReadSource {
private final StreamReadParams params;
BaseStreamMultiReadArgs(StreamMessageId id1, Map<String, StreamMessageId> offsets) {
this.params = new StreamReadParams(id1, offsets);
}
@Override
public StreamMultiReadArgs count(int count) {
params.setCount(count);
return this;
}
@Override
public StreamMultiReadArgs timeout(Duration timeout) {
params.setTimeout(timeout);
return this;
}
@Override
public StreamReadParams getParams() {
return params;
}
}

@ -0,0 +1,58 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.stream;
import org.redisson.api.StreamMessageId;
import java.time.Duration;
import java.util.Map;
/**
*
* @author Nikita Koksharov
*
*/
class BaseStreamMultiReadGroupArgs implements StreamMultiReadGroupArgs, StreamReadGroupSource {
private final StreamReadGroupParams params;
BaseStreamMultiReadGroupArgs(StreamMessageId id1, Map<String, StreamMessageId> offsets) {
this.params = new StreamReadGroupParams(id1, offsets);
}
@Override
public StreamMultiReadGroupArgs noAck() {
params.setNoAck(true);
return this;
}
@Override
public StreamMultiReadGroupArgs count(int count) {
params.setCount(count);
return this;
}
@Override
public StreamMultiReadGroupArgs timeout(Duration timeout) {
params.setTimeout(timeout);
return this;
}
@Override
public StreamReadGroupParams getParams() {
return params;
}
}

@ -0,0 +1,52 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.stream;
import org.redisson.api.StreamMessageId;
import java.time.Duration;
import java.util.Map;
/**
*
* @author Nikita Koksharov
*
*/
class BaseStreamReadArgs implements StreamReadArgs, StreamReadSource {
private final StreamReadParams params;
BaseStreamReadArgs(StreamMessageId id1, Map<String, StreamMessageId> offsets) {
this.params = new StreamReadParams(id1, offsets);
}
@Override
public StreamReadArgs count(int count) {
params.setCount(count);
return this;
}
@Override
public StreamReadArgs timeout(Duration timeout) {
params.setTimeout(timeout);
return this;
}
@Override
public StreamReadParams getParams() {
return params;
}
}

@ -0,0 +1,58 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.stream;
import org.redisson.api.StreamMessageId;
import java.time.Duration;
import java.util.Map;
/**
*
* @author Nikita Koksharov
*
*/
class BaseStreamReadGroupArgs implements StreamReadGroupArgs, StreamReadGroupSource {
private final StreamReadGroupParams params;
BaseStreamReadGroupArgs(StreamMessageId id1, Map<String, StreamMessageId> offsets) {
this.params = new StreamReadGroupParams(id1, offsets);
}
@Override
public StreamReadGroupArgs noAck() {
params.setNoAck(true);
return this;
}
@Override
public StreamReadGroupArgs count(int count) {
params.setCount(count);
return this;
}
@Override
public StreamReadGroupArgs timeout(Duration timeout) {
params.setTimeout(timeout);
return this;
}
@Override
public StreamReadGroupParams getParams() {
return params;
}
}

@ -0,0 +1,162 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.stream;
import org.redisson.api.StreamMessageId;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
* Arguments object for RStream.read() methods.
*
* @author Nikita Koksharov
*
*/
public interface StreamMultiReadArgs {
/**
* Defines stream data size limit.
*
* @param count - stream data size limit
* @return arguments object
*/
StreamMultiReadArgs count(int count);
/**
* Defines time interval to wait for stream data availability.
*
* @param timeout - timeout duration
* @return arguments object
*/
StreamMultiReadArgs timeout(Duration timeout);
/**
* Defines last stream ids received from all Streams
* including current one.
* <p>
* Read stream data from all defined streams
* with ids greater than defined ids.
*
* @param id1 - last stream id of current stream
* @param stream2 - name of 2nd stream
* @param id2 - last stream id of 2nd stream
* @return arguments object
*/
static StreamMultiReadArgs greaterThan(StreamMessageId id1,
String stream2, StreamMessageId id2) {
return greaterThan(id1, Collections.singletonMap(stream2, id2));
}
/**
* Defines last stream ids received from all Streams
* including current one.
* <p>
* Read stream data from all defined streams
* with ids greater than defined ids.
*
* @param id1 - last stream id of current stream
* @param stream2 - name of 2nd stream
* @param id2 - last stream id of 2nd stream
* @param stream3 - name of 3rd stream
* @param id3 - last stream id of 3rd stream
* @return arguments object
*/
static StreamMultiReadArgs greaterThan(StreamMessageId id1,
String stream2, StreamMessageId id2,
String stream3, StreamMessageId id3) {
Map<String, StreamMessageId> map = new HashMap<>();
map.put(stream2, id2);
map.put(stream3, id3);
return greaterThan(id1, map);
}
/**
* Defines last stream ids received from all Streams
* including current one.
* <p>
* Read stream data from all defined streams
* with ids greater than defined ids.
*
* @param id1 - last stream id of current stream
* @param stream2 - name of 2nd stream
* @param id2 - last stream id of 2nd stream
* @param stream3 - name of 3rd stream
* @param id3 - last stream id of 3rd stream
* @param stream4 - name of 4th stream
* @param id4 - last stream id of 4th stream
* @return arguments object
*/
static StreamMultiReadArgs greaterThan(StreamMessageId id1,
String stream2, StreamMessageId id2,
String stream3, StreamMessageId id3,
String stream4, StreamMessageId id4) {
Map<String, StreamMessageId> map = new HashMap<>();
map.put(stream2, id2);
map.put(stream3, id3);
map.put(stream4, id4);
return greaterThan(id1, map);
}
/**
* Defines last stream ids received from all Streams
* including current one.
* <p>
* Read stream data from all defined streams
* with ids greater than defined ids.
*
* @param id1 - last stream id of current stream
* @param stream2 - name of 2nd stream
* @param id2 - last stream id of 2nd stream
* @param stream3 - name of 3rd stream
* @param id3 - last stream id of 3rd stream
* @param stream4 - name of 4th stream
* @param id4 - last stream id of 4th stream
* @param stream5 - name of 4th stream
* @param id5 - last stream id of 4th stream
* @return arguments object
*/
static StreamMultiReadArgs greaterThan(StreamMessageId id1,
String stream2, StreamMessageId id2,
String stream3, StreamMessageId id3,
String stream4, StreamMessageId id4,
String stream5, StreamMessageId id5) {
Map<String, StreamMessageId> map = new HashMap<>();
map.put(stream2, id2);
map.put(stream3, id3);
map.put(stream4, id4);
map.put(stream5, id5);
return greaterThan(id1, map);
}
/**
* Defines last stream ids received from all Streams
* including current one.
* <p>
* Read stream data from all defined streams
* with ids greater than defined ids.
*
* @param id1 - last stream id of current stream
* @param offsets - last stream id mapped by stream name
* @return arguments object
*/
static StreamMultiReadArgs greaterThan(StreamMessageId id1, Map<String, StreamMessageId> offsets) {
return new BaseStreamMultiReadArgs(id1, offsets);
}
}

@ -0,0 +1,154 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.stream;
import org.redisson.api.StreamMessageId;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
* Arguments object for RStream.readGroup() methods.
*
* @author Nikita Koksharov
*
*/
public interface StreamMultiReadGroupArgs {
/**
* Defines avoid of adding messages to Pending Entries List.
*
* @return arguments object
*/
StreamMultiReadGroupArgs noAck();
/**
* Defines stream data size limit.
*
* @param count - stream data size limit
* @return arguments object
*/
StreamMultiReadGroupArgs count(int count);
/**
* Defines time interval to wait for stream data availability.
*
* @param timeout - timeout duration
* @return arguments object
*/
StreamMultiReadGroupArgs timeout(Duration timeout);
/**
* Defines to return messages of all Streams
* with ids greater than defined message ids.
*
* @param id1 - last stream id of current stream
* @param stream2 - name of 2nd stream
* @param id2 - last stream id of 2nd stream
* @return arguments object
*/
static StreamMultiReadGroupArgs greaterThan(StreamMessageId id1,
String stream2, StreamMessageId id2) {
return greaterThan(id1, Collections.singletonMap(stream2, id2));
}
/**
* Defines to return messages of all Streams
* with ids greater than defined message ids.
*
* @param id1 - last stream id of current stream
* @param stream2 - name of 2nd stream
* @param id2 - last stream id of 2nd stream
* @param stream3 - name of 3rd stream
* @param id3 - last stream id of 3rd stream
* @return arguments object
*/
static StreamMultiReadGroupArgs greaterThan(StreamMessageId id1,
String stream2, StreamMessageId id2,
String stream3, StreamMessageId id3) {
Map<String, StreamMessageId> map = new HashMap<>();
map.put(stream2, id2);
map.put(stream3, id3);
return greaterThan(id1, map);
}
/**
* Defines to return messages of all Streams
* with ids greater than defined message ids.
*
* @param id1 - last stream id of current stream
* @param stream2 - name of 2nd stream
* @param id2 - last stream id of 2nd stream
* @param stream3 - name of 3rd stream
* @param id3 - last stream id of 3rd stream
* @param stream4 - name of 4th stream
* @param id4 - last stream id of 4th stream
* @return arguments object
*/
static StreamMultiReadGroupArgs greaterThan(StreamMessageId id1,
String stream2, StreamMessageId id2,
String stream3, StreamMessageId id3,
String stream4, StreamMessageId id4) {
Map<String, StreamMessageId> map = new HashMap<>();
map.put(stream2, id2);
map.put(stream3, id3);
map.put(stream4, id4);
return greaterThan(id1, map);
}
/**
* Defines to return messages of all Streams
* with ids greater than defined message ids.
*
* @param id1 - last stream id of current stream
* @param stream2 - name of 2nd stream
* @param id2 - last stream id of 2nd stream
* @param stream3 - name of 3rd stream
* @param id3 - last stream id of 3rd stream
* @param stream4 - name of 4th stream
* @param id4 - last stream id of 4th stream
* @param stream5 - name of 4th stream
* @param id5 - last stream id of 4th stream
* @return arguments object
*/
static StreamMultiReadGroupArgs greaterThan(StreamMessageId id1,
String stream2, StreamMessageId id2,
String stream3, StreamMessageId id3,
String stream4, StreamMessageId id4,
String stream5, StreamMessageId id5) {
Map<String, StreamMessageId> map = new HashMap<>();
map.put(stream2, id2);
map.put(stream3, id3);
map.put(stream4, id4);
map.put(stream5, id5);
return greaterThan(id1, map);
}
/**
* Defines to return messages of all Streams
* with ids greater than defined message ids.
*
* @param id - last stream id of current stream
* @param offsets - last stream id mapped by stream name
* @return arguments object
*/
static StreamMultiReadGroupArgs greaterThan(StreamMessageId id, Map<String, StreamMessageId> offsets) {
return new BaseStreamMultiReadGroupArgs(id, offsets);
}
}

@ -0,0 +1,58 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.stream;
import org.redisson.api.StreamMessageId;
import java.time.Duration;
import java.util.Collections;
/**
* Arguments object for RStream.read() methods.
*
* @author Nikita Koksharov
*
*/
public interface StreamReadArgs {
/**
* Defines stream data size limit.
*
* @param count - stream data size limit
* @return arguments object
*/
StreamReadArgs count(int count);
/**
* Defines time interval to wait for stream data availability.
*
* @param timeout - timeout duration
* @return arguments object
*/
StreamReadArgs timeout(Duration timeout);
/**
* Defines last stream id received from current Stream.
* Read stream data with ids greater than defined id.
*
* @param id0 - last stream id of current stream
* @return arguments object
*/
static StreamReadArgs greaterThan(StreamMessageId id0) {
return new BaseStreamReadArgs(id0, Collections.emptyMap());
}
}

@ -0,0 +1,75 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.stream;
import org.redisson.api.StreamMessageId;
import java.time.Duration;
import java.util.Collections;
/**
* Arguments object for RStream.readGroup() methods.
*
* @author Nikita Koksharov
*
*/
public interface StreamReadGroupArgs {
/**
* Defines avoid of adding messages to Pending Entries List.
*
* @return arguments object
*/
StreamReadGroupArgs noAck();
/**
* Defines stream data size limit.
*
* @param count - stream data size limit
* @return arguments object
*/
StreamReadGroupArgs count(int count);
/**
* Defines time interval to wait for stream data availability.
*
* @param timeout - timeout duration
* @return arguments object
*/
StreamReadGroupArgs timeout(Duration timeout);
/**
* Defines to return messages of current Stream
* never delivered to any other consumer.
*
* @return arguments object
*/
static StreamReadGroupArgs neverDelivered() {
return greaterThan(null);
}
/**
* Defines to return messages of current Stream
* with ids greater than defined message id.
*
* @param id - message id
* @return arguments object
*/
static StreamReadGroupArgs greaterThan(StreamMessageId id) {
return new BaseStreamReadGroupArgs(id, Collections.emptyMap());
}
}

@ -0,0 +1,42 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.stream;
import org.redisson.api.StreamMessageId;
import java.util.Map;
/**
*
* @author Nikita Koksharov
*
*/
public class StreamReadGroupParams extends StreamReadParams {
private boolean noAck;
public StreamReadGroupParams(StreamMessageId id1, Map<String, StreamMessageId> offsets) {
super(id1, offsets);
}
public boolean isNoAck() {
return noAck;
}
public void setNoAck(boolean noAck) {
this.noAck = noAck;
}
}

@ -0,0 +1,27 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.stream;
/**
*
* @author Nikita Koksharov
*
*/
public interface StreamReadGroupSource {
StreamReadGroupParams getParams();
}

@ -0,0 +1,63 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.stream;
import org.redisson.api.StreamMessageId;
import java.time.Duration;
import java.util.Map;
/**
*
* @author Nikita Koksharov
*
*/
public class StreamReadParams {
private final StreamMessageId id1;
private final Map<String, StreamMessageId> offsets;
private int count;
private Duration timeout;
public StreamReadParams(StreamMessageId id1, Map<String, StreamMessageId> offsets) {
this.id1 = id1;
this.offsets = offsets;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
public Duration getTimeout() {
return timeout;
}
public void setTimeout(Duration timeout) {
this.timeout = timeout;
}
public StreamMessageId getId1() {
return id1;
}
public Map<String, StreamMessageId> getOffsets() {
return offsets;
}
}

@ -0,0 +1,27 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.stream;
/**
*
* @author Nikita Koksharov
*
*/
public interface StreamReadSource {
StreamReadParams getParams();
}

@ -2,6 +2,7 @@ package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
@ -11,8 +12,7 @@ import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.redisson.api.*;
import org.redisson.api.stream.StreamAddArgs;
import org.redisson.api.stream.TrimStrategy;
import org.redisson.api.stream.*;
import org.redisson.client.RedisException;
public class RedissonStreamTest extends BaseTest {
@ -28,13 +28,13 @@ public class RedissonStreamTest extends BaseTest {
StreamMessageId id1 = stream.add(StreamAddArgs.entry("1", "1"));
StreamMessageId id2 = stream.add(StreamAddArgs.entry("2", "2"));
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1", StreamReadGroupArgs.neverDelivered());
assertThat(s.size()).isEqualTo(2);
StreamMessageId id3 = stream.add(StreamAddArgs.entry("3", "33"));
StreamMessageId id4 = stream.add(StreamAddArgs.entry("4", "44"));
Map<StreamMessageId, Map<String, String>> s2 = stream.readGroup("testGroup", "consumer2");
Map<StreamMessageId, Map<String, String>> s2 = stream.readGroup("testGroup", "consumer2", StreamReadGroupArgs.neverDelivered());
assertThat(s2.size()).isEqualTo(2);
AutoClaimResult<String, String> res = stream.autoClaim("testGroup", "consumer1", 1, TimeUnit.MILLISECONDS, id3, 2);
@ -56,13 +56,13 @@ public class RedissonStreamTest extends BaseTest {
StreamMessageId id1 = stream.add(StreamAddArgs.entry("1", "1"));
StreamMessageId id2 = stream.add(StreamAddArgs.entry("2", "2"));
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1", StreamReadGroupArgs.neverDelivered());
assertThat(s.size()).isEqualTo(2);
StreamMessageId id3 = stream.add(StreamAddArgs.entry("3", "3"));
StreamMessageId id4 = stream.add(StreamAddArgs.entry("4", "4"));
Map<StreamMessageId, Map<String, String>> s2 = stream.readGroup("testGroup", "consumer2");
Map<StreamMessageId, Map<String, String>> s2 = stream.readGroup("testGroup", "consumer2", StreamReadGroupArgs.neverDelivered());
assertThat(s2.size()).isEqualTo(2);
List<PendingEntry> list = stream.listPending("testGroup", StreamMessageId.MIN, StreamMessageId.MAX, 1, TimeUnit.MILLISECONDS, 10);
@ -117,12 +117,12 @@ public class RedissonStreamTest extends BaseTest {
StreamMessageId id2 = stream.add(StreamAddArgs.entry("2", "2"));
System.out.println("id2 " + id2);
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1", StreamReadGroupArgs.neverDelivered());
assertThat(s.size()).isEqualTo(2);
stream.updateGroupMessageId("testGroup", id);
Map<StreamMessageId, Map<String, String>> s2 = stream.readGroup("testGroup", "consumer2");
Map<StreamMessageId, Map<String, String>> s2 = stream.readGroup("testGroup", "consumer2", StreamReadGroupArgs.neverDelivered());
assertThat(s2.size()).isEqualTo(2);
}
@ -137,7 +137,7 @@ public class RedissonStreamTest extends BaseTest {
StreamMessageId id1 = stream.add(StreamAddArgs.entry("1", "1"));
StreamMessageId id2 = stream.add(StreamAddArgs.entry("2", "2"));
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1", StreamReadGroupArgs.neverDelivered());
assertThat(s.size()).isEqualTo(2);
assertThat(stream.removeConsumer("testGroup", "consumer1")).isEqualTo(2);
@ -157,7 +157,7 @@ public class RedissonStreamTest extends BaseTest {
stream.removeGroup("testGroup");
stream.readGroup("testGroup", "consumer1");
stream.readGroup("testGroup", "consumer1", StreamReadGroupArgs.neverDelivered());
}
@Test
@ -183,13 +183,13 @@ public class RedissonStreamTest extends BaseTest {
StreamMessageId id1 = stream.add(StreamAddArgs.entry("1", "1"));
StreamMessageId id2 = stream.add(StreamAddArgs.entry("2", "2"));
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1", StreamReadGroupArgs.neverDelivered());
assertThat(s.size()).isEqualTo(2);
StreamMessageId id3 = stream.add(StreamAddArgs.entry("3", "33"));
StreamMessageId id4 = stream.add(StreamAddArgs.entry("4", "44"));
Map<StreamMessageId, Map<String, String>> s2 = stream.readGroup("testGroup", "consumer2");
Map<StreamMessageId, Map<String, String>> s2 = stream.readGroup("testGroup", "consumer2", StreamReadGroupArgs.neverDelivered());
assertThat(s2.size()).isEqualTo(2);
stream.remove(id3);
@ -210,13 +210,13 @@ public class RedissonStreamTest extends BaseTest {
StreamMessageId id1 = stream.add(StreamAddArgs.entry("1", "1"));
StreamMessageId id2 = stream.add(StreamAddArgs.entry("2", "2"));
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1", StreamReadGroupArgs.neverDelivered());
assertThat(s.size()).isEqualTo(2);
StreamMessageId id3 = stream.add(StreamAddArgs.entry("3", "33"));
StreamMessageId id4 = stream.add(StreamAddArgs.entry("4", "44"));
Map<StreamMessageId, Map<String, String>> s2 = stream.readGroup("testGroup", "consumer2");
Map<StreamMessageId, Map<String, String>> s2 = stream.readGroup("testGroup", "consumer2", StreamReadGroupArgs.neverDelivered());
assertThat(s2.size()).isEqualTo(2);
Map<StreamMessageId, Map<String, String>> res = stream.claim("testGroup", "consumer1", 1, TimeUnit.MILLISECONDS, id3, id4);
@ -239,13 +239,13 @@ public class RedissonStreamTest extends BaseTest {
StreamMessageId id1 = stream.add(StreamAddArgs.entry("1", "1"));
StreamMessageId id2 = stream.add(StreamAddArgs.entry("2", "2"));
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup3", "consumer1");
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup3", "consumer1", StreamReadGroupArgs.neverDelivered());
assertThat(s.size()).isEqualTo(2);
StreamMessageId id3 = stream.add(StreamAddArgs.entry("3", "33"));
StreamMessageId id4 = stream.add(StreamAddArgs.entry("4", "44"));
Map<StreamMessageId, Map<String, String>> s2 = stream.readGroup("testGroup3", "consumer2");
Map<StreamMessageId, Map<String, String>> s2 = stream.readGroup("testGroup3", "consumer2", StreamReadGroupArgs.neverDelivered());
assertThat(s2.size()).isEqualTo(2);
FastAutoClaimResult res = stream.fastAutoClaim("testGroup3", "consumer1", 1, TimeUnit.MILLISECONDS, id3, 10);
@ -264,13 +264,13 @@ public class RedissonStreamTest extends BaseTest {
StreamMessageId id1 = stream.add(StreamAddArgs.entry("1", "1"));
StreamMessageId id2 = stream.add(StreamAddArgs.entry("2", "2"));
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup3", "consumer1");
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup3", "consumer1", StreamReadGroupArgs.neverDelivered());
assertThat(s.size()).isEqualTo(2);
StreamMessageId id3 = stream.add(StreamAddArgs.entry("3", "33"));
StreamMessageId id4 = stream.add(StreamAddArgs.entry("4", "44"));
Map<StreamMessageId, Map<String, String>> s2 = stream.readGroup("testGroup3", "consumer2");
Map<StreamMessageId, Map<String, String>> s2 = stream.readGroup("testGroup3", "consumer2", StreamReadGroupArgs.neverDelivered());
assertThat(s2.size()).isEqualTo(2);
List<StreamMessageId> res = stream.fastClaim("testGroup3", "consumer1", 1, TimeUnit.MILLISECONDS, id3, id4);
@ -289,13 +289,13 @@ public class RedissonStreamTest extends BaseTest {
StreamMessageId id1 = stream.add(StreamAddArgs.entry("1", "1"));
StreamMessageId id2 = stream.add(StreamAddArgs.entry("2", "2"));
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1", StreamReadGroupArgs.neverDelivered());
assertThat(s.size()).isEqualTo(2);
StreamMessageId id3 = stream.add(StreamAddArgs.entry("3", "3"));
StreamMessageId id4 = stream.add(StreamAddArgs.entry("4", "4"));
Map<StreamMessageId, Map<String, String>> s2 = stream.readGroup("testGroup", "consumer2");
Map<StreamMessageId, Map<String, String>> s2 = stream.readGroup("testGroup", "consumer2", StreamReadGroupArgs.neverDelivered());
assertThat(s2.size()).isEqualTo(2);
PendingResult pi = stream.listPending("testGroup");
@ -332,7 +332,7 @@ public class RedissonStreamTest extends BaseTest {
StreamMessageId id1 = stream.add(StreamAddArgs.entry("11", "12"));
StreamMessageId id2 = stream.add(StreamAddArgs.entry("21", "22"));
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1", StreamReadGroupArgs.neverDelivered());
assertThat(s.size()).isEqualTo(2);
Map<StreamMessageId, Map<String, String>> pres = stream.pendingRange("testGroup", StreamMessageId.MIN, StreamMessageId.MAX, 10);
@ -360,7 +360,7 @@ public class RedissonStreamTest extends BaseTest {
StreamMessageId id1 = stream.add(StreamAddArgs.entry("1", "1"));
StreamMessageId id2 = stream.add(StreamAddArgs.entry("2", "2"));
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1", StreamReadGroupArgs.neverDelivered());
assertThat(s.size()).isEqualTo(2);
assertThat(stream.ack("testGroup", id1, id2)).isEqualTo(2);
@ -384,7 +384,7 @@ public class RedissonStreamTest extends BaseTest {
StreamMessageId id22 = stream2.add(StreamAddArgs.entry("2", "2"));
StreamMessageId id23 = stream2.add(StreamAddArgs.entry("3", "3"));
Map<String, Map<StreamMessageId, Map<String, String>>> s2 = stream1.readGroup("testGroup", "consumer1", id11, Collections.singletonMap("test2", id21));
Map<String, Map<StreamMessageId, Map<String, String>>> s2 = stream1.readGroup("testGroup", "consumer1", StreamMultiReadGroupArgs.greaterThan(id11, Collections.singletonMap("test2", id21)));
assertThat(s2).isEmpty();
}
@ -400,7 +400,7 @@ public class RedissonStreamTest extends BaseTest {
stream.add(StreamAddArgs.entry("2", "2"));
stream.add(StreamAddArgs.entry("3", "3"));
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1", 3, 5, TimeUnit.SECONDS);
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1", StreamReadGroupArgs.neverDelivered().count(3).timeout(Duration.ofSeconds(5)));
assertThat(s.values().iterator().next().keySet()).containsAnyOf("1", "2", "3");
assertThat(s.size()).isEqualTo(3);
@ -429,7 +429,7 @@ public class RedissonStreamTest extends BaseTest {
stream.createGroup("testGroup", StreamMessageId.ALL);
stream.add(StreamAddArgs.entry("1", "2"));
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1", StreamReadGroupArgs.neverDelivered());
assertThat(s).hasSize(1);
}
@ -445,7 +445,7 @@ public class RedissonStreamTest extends BaseTest {
stream.add(StreamAddArgs.entry("2", "2"));
stream.add(StreamAddArgs.entry("3", "3"));
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1", StreamReadGroupArgs.neverDelivered());
assertThat(s.values().iterator().next().keySet()).containsAnyOf("1", "2", "3");
assertThat(s.size()).isEqualTo(3);
@ -453,14 +453,14 @@ public class RedissonStreamTest extends BaseTest {
stream.add(StreamAddArgs.entry("2", "2"));
stream.add(StreamAddArgs.entry("3", "3"));
Map<StreamMessageId, Map<String, String>> s1 = stream.readGroup("testGroup", "consumer1", 1);
Map<StreamMessageId, Map<String, String>> s1 = stream.readGroup("testGroup", "consumer1", StreamReadGroupArgs.neverDelivered().count(1));
assertThat(s1.size()).isEqualTo(1);
StreamMessageId id = stream.add(StreamAddArgs.entry("1", "1"));
stream.add(StreamAddArgs.entry("2", "2"));
stream.add(StreamAddArgs.entry("3", "3"));
Map<StreamMessageId, Map<String, String>> s2 = stream.readGroup("testGroup", "consumer1", id);
Map<StreamMessageId, Map<String, String>> s2 = stream.readGroup("testGroup", "consumer1", StreamReadGroupArgs.greaterThan(id));
assertThat(s2).isEmpty();
}
@ -535,7 +535,9 @@ public class RedissonStreamTest extends BaseTest {
t.start();
long start = System.currentTimeMillis();
Map<String, Map<StreamMessageId, Map<String, String>>> s = stream.read(2, 5, TimeUnit.SECONDS, new StreamMessageId(0), "test1", StreamMessageId.NEWEST);
Map<String, Map<StreamMessageId, Map<String, String>>> s = stream.read(StreamMultiReadArgs.greaterThan(new StreamMessageId(0), "test1", StreamMessageId.NEWEST)
.timeout(Duration.ofSeconds(5))
.count(2));
assertThat(System.currentTimeMillis() - start).isBetween(1900L, 2200L);
assertThat(s).hasSize(1);
assertThat(s.get("test").get(new StreamMessageId(1))).isEqualTo(entries1);
@ -564,7 +566,7 @@ public class RedissonStreamTest extends BaseTest {
t.start();
long start = System.currentTimeMillis();
Map<StreamMessageId, Map<String, String>> s = stream.read(2, 4, TimeUnit.SECONDS, new StreamMessageId(0));
Map<StreamMessageId, Map<String, String>> s = stream.read(StreamReadArgs.greaterThan(new StreamMessageId(0)).count(2).timeout(Duration.ofSeconds(4)));
assertThat(System.currentTimeMillis() - start).isBetween(1900L, 2200L);
assertThat(s).hasSize(1);
assertThat(s.get(new StreamMessageId(1))).isEqualTo(entries1);
@ -576,8 +578,9 @@ public class RedissonStreamTest extends BaseTest {
StreamMessageId id1 = stream2.add(StreamAddArgs.entry("33", "33"));
stream2.add(StreamAddArgs.entry("44", "44"));
Map<String, Map<StreamMessageId, Map<String, String>>> s2 = stream.read(5, TimeUnit.SECONDS, id0, Collections.singletonMap("test2", id1));
Map<String, Map<StreamMessageId, Map<String, String>>> s2 = stream.read(StreamMultiReadArgs.greaterThan(id0, "test2", id1)
.timeout(Duration.ofSeconds(5)));
assertThat(s2.values().iterator().next().values().iterator().next().keySet()).containsAnyOf("11", "22", "33", "44");
assertThat(s2.keySet()).containsExactlyInAnyOrder("test", "test2");
}
@ -603,7 +606,8 @@ public class RedissonStreamTest extends BaseTest {
@Test
public void testReadMultiKeysEmpty() {
RStream<String, String> stream = redisson.getStream("test2");
Map<String, Map<StreamMessageId, Map<String, String>>> s = stream.read(10, new StreamMessageId(0), "test1", new StreamMessageId(0));
Map<String, Map<StreamMessageId, Map<String, String>>> s = stream.read(StreamMultiReadArgs.greaterThan(new StreamMessageId(0), "test1", new StreamMessageId(0))
.count(10));
assertThat(s).isEmpty();
}
@ -621,8 +625,9 @@ public class RedissonStreamTest extends BaseTest {
entries2.put("5", "55");
entries2.put("6", "66");
stream2.add(StreamAddArgs.entries(entries2));
Map<String, Map<StreamMessageId, Map<String, String>>> s = stream2.read(10, new StreamMessageId(0), "test1", new StreamMessageId(0));
Map<String, Map<StreamMessageId, Map<String, String>>> s = stream2.read(StreamMultiReadArgs.greaterThan(new StreamMessageId(0), "test1", new StreamMessageId(0))
.count(10));
assertThat(s).hasSize(2);
assertThat(s.get("test1").values().iterator().next()).isEqualTo(entries1);
assertThat(s.get("test2").values().iterator().next()).isEqualTo(entries2);
@ -647,7 +652,7 @@ public class RedissonStreamTest extends BaseTest {
entries3.put("17", "07");
stream.add(new StreamMessageId(3), StreamAddArgs.entries(entries3).trim(TrimStrategy.MAXLEN, 1));
Map<StreamMessageId, Map<String, String>> result = stream.read(10, new StreamMessageId(0, 0));
Map<StreamMessageId, Map<String, String>> result = stream.read(StreamReadArgs.greaterThan(new StreamMessageId(0, 0)).count(10));
assertThat(result).hasSize(3);
assertThat(result.get(new StreamMessageId(4))).isNull();
assertThat(result.get(new StreamMessageId(1))).isEqualTo(entries1);
@ -663,7 +668,7 @@ public class RedissonStreamTest extends BaseTest {
entries1.put("3", "31");
stream.add(new StreamMessageId(1), StreamAddArgs.entries(entries1).trim(TrimStrategy.MAXLEN, 1));
Map<StreamMessageId, Map<String, String>> result = stream.read(10, new StreamMessageId(0, 0));
Map<StreamMessageId, Map<String, String>> result = stream.read(StreamReadArgs.greaterThan(new StreamMessageId(0, 0)).count(10));
assertThat(result).hasSize(1);
assertThat(result.get(new StreamMessageId(4))).isNull();
assertThat(result.get(new StreamMessageId(1))).isEqualTo(entries1);
@ -672,7 +677,7 @@ public class RedissonStreamTest extends BaseTest {
@Test
public void testReadEmpty() {
RStream<String, String> stream2 = redisson.getStream("test");
Map<StreamMessageId, Map<String, String>> result2 = stream2.read(10, new StreamMessageId(0, 0));
Map<StreamMessageId, Map<String, String>> result2 = stream2.read(StreamReadArgs.greaterThan(new StreamMessageId(0, 0)).count(10));
assertThat(result2).isEmpty();
}
@ -697,7 +702,7 @@ public class RedissonStreamTest extends BaseTest {
stream.add(id, StreamAddArgs.entries(entries).trim(TrimStrategy.MAXLEN, 10));
assertThat(stream.size()).isEqualTo(1);
Map<StreamMessageId, Map<String, String>> res = stream.read(new StreamMessageId(10, 42));
Map<StreamMessageId, Map<String, String>> res = stream.read(StreamReadArgs.greaterThan(new StreamMessageId(10, 42)));
assertThat(res.get(id).size()).isEqualTo(2);
entries.clear();
@ -725,7 +730,7 @@ public class RedissonStreamTest extends BaseTest {
stream.add(StreamAddArgs.entry("2", "2"));
stream.add(StreamAddArgs.entry("3", "3"));
Map<StreamMessageId, Map<String, String>> map = stream.readGroup("testGroup", "consumer1");
Map<StreamMessageId, Map<String, String>> map = stream.readGroup("testGroup", "consumer1", StreamReadGroupArgs.neverDelivered());
assertThat(map.size()).isEqualTo(6);
List<StreamConsumer> s1 = stream.listConsumers("testGroup");
@ -734,7 +739,7 @@ public class RedissonStreamTest extends BaseTest {
assertThat(s1.get(0).getPending()).isEqualTo(6);
assertThat(s1.get(0).getIdleTime()).isLessThan(100L);
Map<StreamMessageId, Map<String, String>> map2 = stream.readGroup("testGroup2", "consumer2");
Map<StreamMessageId, Map<String, String>> map2 = stream.readGroup("testGroup2", "consumer2", StreamReadGroupArgs.neverDelivered());
assertThat(map2.size()).isEqualTo(6);
List<StreamConsumer> s2 = stream.listConsumers("testGroup2");

Loading…
Cancel
Save