Redis Streams XGROUP, XPENDING, XACK, XCLAIM commands support added. #1490

pull/1639/head
Nikita 6 years ago
parent f574b60315
commit 7c532057f9

@ -52,6 +52,7 @@ import org.redisson.api.RSemaphoreReactive;
import org.redisson.api.RSetCacheReactive;
import org.redisson.api.RSetMultimapReactive;
import org.redisson.api.RSetReactive;
import org.redisson.api.RStreamReactive;
import org.redisson.api.RTopicReactive;
import org.redisson.api.RTransactionReactive;
import org.redisson.api.RedissonReactiveClient;
@ -92,6 +93,7 @@ import org.redisson.reactive.RedissonSemaphoreReactive;
import org.redisson.reactive.RedissonSetCacheReactive;
import org.redisson.reactive.RedissonSetMultimapReactive;
import org.redisson.reactive.RedissonSetReactive;
import org.redisson.reactive.RedissonStreamReactive;
import org.redisson.reactive.RedissonTopicReactive;
import org.redisson.reactive.RedissonTransactionReactive;
@ -121,6 +123,16 @@ public class RedissonReactive implements RedissonReactiveClient {
evictionScheduler = new EvictionScheduler(commandExecutor);
codecProvider = config.getReferenceCodecProvider();
}
@Override
public <K, V> RStreamReactive<K, V> getStream(String name) {
return new RedissonStreamReactive<K, V>(commandExecutor, name);
}
@Override
public <K, V> RStreamReactive<K, V> getStream(String name, Codec codec) {
return new RedissonStreamReactive<K, V>(codec, commandExecutor, name);
}
@Override
public <V> RGeoReactive<V> getGeo(String name) {

@ -23,6 +23,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.redisson.api.PendingEntry;
import org.redisson.api.PendingResult;
import org.redisson.api.RFuture;
import org.redisson.api.RStream;
import org.redisson.api.StreamId;
@ -59,6 +61,169 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
throw new NullPointerException("value can't be null");
}
}
@Override
public void createGroup(String groupName) {
get(createGroupAsync(groupName));
}
@Override
public RFuture<Void> createGroupAsync(String groupName) {
return createGroupAsync(groupName, StreamId.NEWEST);
}
@Override
public void createGroup(String groupName, StreamId id) {
get(createGroupAsync(groupName, id));
}
@Override
public RFuture<Void> createGroupAsync(String groupName, StreamId id) {
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XGROUP, "CREATE", getName(), groupName, id);
}
@Override
public RFuture<Long> ackAsync(String groupName, StreamId... ids) {
List<Object> params = new ArrayList<Object>();
params.add(getName());
params.add(groupName);
for (StreamId id : ids) {
params.add(id);
}
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XACK, params.toArray());
}
@Override
public Long ack(String groupName, StreamId... id) {
return get(ackAsync(groupName, id));
}
@Override
public RFuture<PendingResult> listPendingAsync(String groupName) {
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.XPENDING, getName(), groupName);
}
@Override
public PendingResult listPending(String groupName) {
return get(listPendingAsync(groupName));
}
@Override
public RFuture<List<PendingEntry>> listPendingAsync(String groupName, StreamId startId, StreamId endId, int count,
String consumerName) {
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.XPENDING_ENTRIES, getName(), groupName, startId, endId, count, consumerName);
}
@Override
public RFuture<List<PendingEntry>> listPendingAsync(String groupName, StreamId startId, StreamId endId, int count) {
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.XPENDING_ENTRIES, getName(), groupName, startId, endId, count);
}
@Override
public List<PendingEntry> listPending(String groupName, StreamId startId, StreamId endId, int count) {
return get(listPendingAsync(groupName, startId, endId, count));
}
@Override
public List<PendingEntry> listPending(String groupName, StreamId startId, StreamId endId, int count,
String consumerName) {
return get(listPendingAsync(groupName, startId, endId, count, consumerName));
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> claimAsync(String groupName, String consumerName, long idleTime,
TimeUnit idleTimeUnit, StreamId... ids) {
List<Object> params = new ArrayList<Object>();
params.add(getName());
params.add(groupName);
params.add(consumerName);
params.add(idleTimeUnit.toMillis(idleTime));
for (StreamId id : ids) {
params.add(id.toString());
}
return commandExecutor.readAsync(getName(), codec, RedisCommands.XCLAIM, params.toArray());
}
@Override
public Map<StreamId, Map<K, V>> claimPending(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit,
StreamId... ids) {
return get(claimAsync(groupName, consumerName, idleTime, idleTimeUnit, ids));
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, StreamId... ids) {
return readGroupAsync(groupName, consumerName, 0, ids);
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, int count, StreamId... ids) {
return readGroupAsync(groupName, consumerName, count, 0, null, ids);
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit,
StreamId... ids) {
return readGroupAsync(groupName, consumerName, 0, timeout, unit, ids);
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit,
StreamId... ids) {
List<Object> params = new ArrayList<Object>();
params.add("GROUP");
params.add(groupName);
params.add(consumerName);
if (count > 0) {
params.add("COUNT");
params.add(count);
}
if (timeout > 0) {
params.add("BLOCK");
params.add(toSeconds(timeout, unit)*1000);
}
params.add("STREAMS");
params.add(getName());
if (ids.length == 0) {
params.add(">");
}
for (StreamId id : ids) {
params.add(id.toString());
}
if (timeout > 0) {
return commandExecutor.readAsync(getName(), codec, RedisCommands.XREADGROUP_BLOCKING_SINGLE, params.toArray());
}
return commandExecutor.readAsync(getName(), codec, RedisCommands.XREADGROUP_SINGLE, params.toArray());
}
@Override
public Map<StreamId, Map<K, V>> readGroup(String groupName, String consumerName, StreamId... ids) {
return get(readGroupAsync(groupName, consumerName, ids));
}
@Override
public Map<StreamId, Map<K, V>> readGroup(String groupName, String consumerName, int count, StreamId... ids) {
return get(readGroupAsync(groupName, consumerName, count, ids));
}
@Override
public Map<StreamId, Map<K, V>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamId... ids) {
return get(readGroupAsync(groupName, consumerName, timeout, unit, ids));
}
@Override
public Map<StreamId, Map<K, V>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit,
StreamId... ids) {
return get(readGroupAsync(groupName, consumerName, count, timeout, unit, ids));
}
@Override
public StreamId addAll(Map<K, V> entries) {

@ -0,0 +1,76 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
/**
* Entry object for pending messages request.
*
* @author Nikita Koksharov
*
*/
public class PendingEntry {
private StreamId id;
private String consumerName;
private long idleTime;
private long lastTimeDelivered;
public PendingEntry(StreamId id, String consumerName, long idleTime, long lastTimeDelivered) {
super();
this.id = id;
this.consumerName = consumerName;
this.idleTime = idleTime;
this.lastTimeDelivered = lastTimeDelivered;
}
/**
* Returns stream id of message
*
* @return id
*/
public StreamId getId() {
return id;
}
/**
* Returns name of consumer
*
* @return id
*/
public String getConsumerName() {
return consumerName;
}
/**
* Returns milliseconds amount have passed since the last time
* the message was delivered to some consumer
*
* @return number
*/
public long getIdleTime() {
return idleTime;
}
/**
* Returns number of times that a given message was delivered
*
* @return number
*/
public long getLastTimeDelivered() {
return lastTimeDelivered;
}
}

@ -0,0 +1,83 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.io.Serializable;
import java.util.Map;
/**
* Result object for pending messages request.
*
* @author Nikita Koksharov
*
*/
public class PendingResult implements Serializable {
private static final long serialVersionUID = -5525031552305408248L;
private long total;
private StreamId lowestId;
private StreamId highestId;
private Map<String, Long> consumerNames;
public PendingResult() {
}
public PendingResult(long total, StreamId lowestId, StreamId highestId, Map<String, Long> consumerNames) {
super();
this.total = total;
this.lowestId = lowestId;
this.highestId = highestId;
this.consumerNames = consumerNames;
}
/**
* Total amount of pending messages
*
* @return number
*/
public long getTotal() {
return total;
}
/**
* Lowest stream id of pending messages
*
* @return number
*/
public StreamId getLowestId() {
return lowestId;
}
/**
* Highest stream id of pending messages
*
* @return number
*/
public StreamId getHighestId() {
return highestId;
}
/**
* Pending messages amount mapped by consumer name
*
* @return map
*/
public Map<String, Long> getConsumerNames() {
return consumerNames;
}
}

@ -15,12 +15,12 @@
*/
package org.redisson.api;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Redis Stream implementation.
* Interface for Redis Stream object.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
@ -31,6 +31,133 @@ import java.util.concurrent.TimeUnit;
*/
public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
/**
* Creates consumer group by name.
*
* @param groupName - name of group
*/
void createGroup(String groupName);
/**
* Creates consumer group by name and stream id.
* Only new messages after defined stream <code>id</code> will be available for consumers of this group.
* <p>
* {@link StreamId#NEWEST} is used for messages arrived since the moment of group creating
*
* @param groupName - name of group
* @param id - stream id
*/
void createGroup(String groupName, StreamId id);
/**
* Marks pending messages by group name and stream <code>ids</code> as correctly processed.
*
* @param groupName - name of group
* @param ids - stream ids
* @return marked messages amount
*/
Long ack(String groupName, StreamId... ids);
/**
* Returns pending messages by group name
*
* @param groupName - name of group
* @return result object
*/
PendingResult listPending(String groupName);
/**
* Returns list of pending messages by group name.
* Limited by start stream id and end stream id and count.
* <p>
* {@link StreamId#MAX} is used as max stream id
* {@link StreamId#MIN} is used as min stream id
*
* @param groupName - name of group
* @param startId - start stream id
* @param endId - end stream id
* @param count - amount of messages
* @return list
*/
List<PendingEntry> listPending(String groupName, StreamId startId, StreamId endId, int count);
/**
* Returns list of pending messages by group name and consumer name.
* Limited by start stream id and end stream id and count.
* <p>
* {@link StreamId#MAX} is used as max stream id
* {@link StreamId#MIN} is used as min stream id
*
* @param consumerName - name of consumer
* @param groupName - name of group
* @param startId - start stream id
* @param endId - end stream id
* @param count - amount of messages
* @return list
*/
List<PendingEntry> listPending(String groupName, StreamId startId, StreamId endId, int count, String consumerName);
/**
* Transfers ownership of pending messages by id to a new consumer
* by name if idle time of messages is greater than defined value.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param idleTime - minimum idle time of messages
* @param idleTimeUnit - idle time unit
* @param ids - stream ids
* @return
*/
Map<StreamId, Map<K, V>> claimPending(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Map<StreamId, Map<K, V>> readGroup(String groupName, String consumerName, StreamId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Map<StreamId, Map<K, V>> readGroup(String groupName, String consumerName, int count, StreamId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
* Wait for stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Map<StreamId, Map<K, V>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
* Wait for stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Map<StreamId, Map<K, V>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamId ... ids);
/**
* Returns number of entries in stream
*

@ -15,11 +15,12 @@
*/
package org.redisson.api;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Redis Stream implementation.
* Async interface for Redis Stream object.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
@ -30,6 +31,135 @@ import java.util.concurrent.TimeUnit;
*/
public interface RStreamAsync<K, V> extends RExpirableAsync {
/**
* Creates consumer group by name.
*
* @param groupName - name of group
* @return void
*/
RFuture<Void> createGroupAsync(String groupName);
/**
* Creates consumer group by name and stream id.
* Only new messages after defined stream <code>id</code> will be available for consumers of this group.
* <p>
* {@link StreamId#NEWEST} is used for messages arrived since the moment of group creating
*
* @param groupName - name of group
* @param id - stream id
* @return void
*/
RFuture<Void> createGroupAsync(String groupName, StreamId id);
/**
* Marks pending messages by group name and stream <code>ids</code> as correctly processed.
*
* @param groupName - name of group
* @param ids - stream ids
* @return marked messages amount
*/
RFuture<Long> ackAsync(String groupName, StreamId... ids);
/**
* Returns pending messages by group name
*
* @param groupName - name of group
* @return result object
*/
RFuture<PendingResult> listPendingAsync(String groupName);
/**
* Returns list of pending messages by group name.
* Limited by start stream id and end stream id and count.
* <p>
* {@link StreamId#MAX} is used as max stream id
* {@link StreamId#MIN} is used as min stream id
*
* @param groupName - name of group
* @param startId - start stream id
* @param endId - end stream id
* @param count - amount of messages
* @return list
*/
RFuture<List<PendingEntry>> listPendingAsync(String groupName, StreamId startId, StreamId endId, int count);
/**
* Returns list of pending messages by group name and consumer name.
* Limited by start stream id and end stream id and count.
* <p>
* {@link StreamId#MAX} is used as max stream id
* {@link StreamId#MIN} is used as min stream id
*
* @param consumerName - name of consumer
* @param groupName - name of group
* @param startId - start stream id
* @param endId - end stream id
* @param count - amount of messages
* @return list
*/
RFuture<List<PendingEntry>> listPendingAsync(String groupName, StreamId startId, StreamId endId, int count, String consumerName);
/**
* Transfers ownership of pending messages by id to a new consumer
* by name if idle time of messages is greater than defined value.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param idleTime - minimum idle time of messages
* @param idleTimeUnit - idle time unit
* @param ids - stream ids
* @return
*/
RFuture<Map<StreamId, Map<K, V>>> claimAsync(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, StreamId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, int count, StreamId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
* Wait for stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit, StreamId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
* Wait for stream data availability for specified <code>timeout</code> interval.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamId ... ids);
/**
* Returns number of entries in stream
*

@ -0,0 +1,478 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
/**
* Reactive interface for Redis Stream object.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
* @author Nikita Koksharov
*
* @param <K> key type
* @param <V> value type
*/
public interface RStreamReactive<K, V> extends RExpirableReactive {
/**
* Creates consumer group by name.
*
* @param groupName - name of group
* @return void
*/
Publisher<Void> createGroup(String groupName);
/**
* Creates consumer group by name and stream id.
* Only new messages after defined stream <code>id</code> will be available for consumers of this group.
* <p>
* {@link StreamId#NEWEST} is used for messages arrived since the moment of group creating
*
* @param groupName - name of group
* @return void
*/
Publisher<Void> createGroup(String groupName, StreamId id);
/**
* Marks pending messages by group name and stream <code>ids</code> as correctly processed.
*
* @param groupName - name of group
* @param ids - stream ids
* @return marked messages amount
*/
Publisher<Long> ack(String groupName, StreamId... ids);
/**
* Returns pending messages by group name
*
* @param groupName - name of group
* @return result object
*/
Publisher<PendingResult> listPending(String groupName);
/**
* Returns list of pending messages by group name.
* Limited by start stream id and end stream id and count.
* <p>
* {@link StreamId#MAX} is used as max stream id
* {@link StreamId#MIN} is used as min stream id
*
* @param groupName - name of group
* @param startId - start stream id
* @param endId - end stream id
* @param count - amount of messages
* @return list
*/
Publisher<List<PendingEntry>> listPending(String groupName, StreamId startId, StreamId endId, int count);
/**
* Returns list of pending messages by group name and consumer name.
* Limited by start stream id and end stream id and count.
* <p>
* {@link StreamId#MAX} is used as max stream id
* {@link StreamId#MIN} is used as min stream id
*
* @param consumerName - name of consumer
* @param groupName - name of group
* @param startId - start stream id
* @param endId - end stream id
* @param count - amount of messages
* @return list
*/
Publisher<List<PendingEntry>> listPending(String groupName, StreamId startId, StreamId endId, int count, String consumerName);
/**
* Transfers ownership of pending messages by id to a new consumer
* by name if idle time of messages is greater than defined value.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @param idleTime - minimum idle time of messages
* @param idleTimeUnit - idle time unit
* @param ids - stream ids
* @return
*/
Publisher<Map<StreamId, Map<K, V>>> claim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
*
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Publisher<Map<StreamId, Map<K, V>>> readGroup(String groupName, String consumerName, StreamId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
*
* @param count - stream data size limit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Publisher<Map<StreamId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, StreamId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
* Wait for stream data availability for specified <code>timeout</code> interval.
*
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Publisher<Map<StreamId, Map<K, V>>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamId ... ids);
/**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
* Wait for stream data availability for specified <code>timeout</code> interval.
*
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Publisher<Map<StreamId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamId ... ids);
/**
* Returns number of entries in stream
*
* @return size of stream
*/
Publisher<Long> size();
/**
* Appends a new entry and returns generated Stream ID
*
* @param key - key of entry
* @param value - value of entry
* @return Stream ID
*/
Publisher<StreamId> add(K key, V value);
/**
* Appends a new entry by specified Stream ID
*
* @param id - Stream ID
* @param key - key of entry
* @param value - value of entry
* @return void
*/
Publisher<Void> add(StreamId id, K key, V value);
/**
* Appends a new entry and returns generated Stream ID.
* Trims stream to a specified <code>trimLen</code> size.
* If <code>trimStrict</code> is <code>false</code> then trims to few tens of entries more than specified length to trim.
*
* @param key - key of entry
* @param value - value of entry
* @param trimLen - length to trim
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return Stream ID
*/
Publisher<StreamId> add(K key, V value, int trimLen, boolean trimStrict);
/**
* Appends a new entry by specified Stream ID.
* Trims stream to a specified <code>trimLen</code> size.
* If <code>trimStrict</code> is <code>false</code> then trims to few tens of entries more than specified length to trim.
*
* @param id - Stream ID
* @param key - key of entry
* @param value - value of entry
* @param trimLen - length to trim
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return void
*/
Publisher<Void> add(StreamId id, K key, V value, int trimLen, boolean trimStrict);
/**
* Appends new entries and returns generated Stream ID
*
* @param entries - entries to add
* @return Stream ID
*/
Publisher<StreamId> addAll(Map<K, V> entries);
/**
* Appends new entries by specified Stream ID
*
* @param id - Stream ID
* @param entries - entries to add
* @return void
*/
Publisher<Void> addAll(StreamId id, Map<K, V> entries);
/**
* Appends new entries and returns generated Stream ID.
* Trims stream to a specified <code>trimLen</code> size.
* If <code>trimStrict</code> is <code>false</code> then trims to few tens of entries more than specified length to trim.
*
* @param entries - entries to add
* @param trimLen - length to trim
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return Stream ID
*/
Publisher<StreamId> addAll(Map<K, V> entries, int trimLen, boolean trimStrict);
/**
* Appends new entries by specified Stream ID.
* Trims stream to a specified <code>trimLen</code> size.
* If <code>trimStrict</code> is <code>false</code> then trims to few tens of entries more than specified length to trim.
*
* @param id - Stream ID
* @param entries - entries to add
* @param trimLen - length to trim
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return void
*/
Publisher<Void> addAll(StreamId id, Map<K, V> entries, int trimLen, boolean trimStrict);
/**
* Read stream data by specified collection of Stream IDs.
*
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Publisher<Map<StreamId, Map<K, V>>> read(StreamId ... ids);
/**
* Read stream data by specified collection of Stream IDs.
*
* @param count - stream data size limit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Publisher<Map<StreamId, Map<K, V>>> read(int count, StreamId ... ids);
/**
* Read stream data by specified collection of Stream IDs.
* Wait for stream data availability for specified <code>timeout</code> interval.
*
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Publisher<Map<StreamId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamId ... ids);
/**
* Read stream data by specified collection of Stream IDs.
* Wait for stream data availability for specified <code>timeout</code> interval.
*
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Publisher<Map<StreamId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamId ... ids);
/**
* Read stream data by specified stream name including this stream.
*
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
*/
Publisher<Map<String, Map<StreamId, Map<K, V>>>> read(StreamId id, String name2, StreamId id2);
/**
* Read stream data by specified stream names including this stream.
*
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @param name3 - name of third stream
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
*/
Publisher<Map<String, Map<StreamId, Map<K, V>>>> read(StreamId id, String name2, StreamId id2, String name3, StreamId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
*
* @param id - id of this stream
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
*/
Publisher<Map<String, Map<StreamId, Map<K, V>>>> read(StreamId id, Map<String, StreamId> nameToId);
/**
* Read stream data by specified stream name including this stream.
*
* @param count - stream data size limit
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
*/
Publisher<Map<String, Map<StreamId, Map<K, V>>>> read(int count, StreamId id, String name2, StreamId id2);
/**
* Read stream data by specified stream names including this stream.
*
* @param count - stream data size limit
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @param name3 - name of third stream
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
*/
Publisher<Map<String, Map<StreamId, Map<K, V>>>> read(int count, StreamId id, String name2, StreamId id2, String name3, StreamId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
*
* @param count - stream data size limit
* @param id - id of this stream
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
*/
Publisher<Map<String, Map<StreamId, Map<K, V>>>> read(int count, StreamId id, Map<String, StreamId> nameToId);
/**
* Read stream data by specified stream name including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
*/
Publisher<Map<String, Map<StreamId, Map<K, V>>>> read(long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2);
/**
* Read stream data by specified stream names including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @param name3 - name of third stream
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
*/
Publisher<Map<String, Map<StreamId, Map<K, V>>>> read(long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2, String name3, StreamId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
*/
Publisher<Map<String, Map<StreamId, Map<K, V>>>> read(long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> nameToId);
/**
* Read stream data by specified stream name including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
*/
Publisher<Map<String, Map<StreamId, Map<K, V>>>> read(int count, long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2);
/**
* Read stream data by specified stream names including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @param name3 - name of third stream
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
*/
Publisher<Map<String, Map<StreamId, Map<K, V>>>> read(int count, long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2, String name3, StreamId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
*/
Publisher<Map<String, Map<StreamId, Map<K, V>>>> read(int count, long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> nameToId);
/**
* Read stream data in range by specified start Stream ID (included) and end Stream ID (included).
*
* @param startId - start Stream ID
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
*/
Publisher<Map<StreamId, Map<K, V>>> range(StreamId startId, StreamId endId);
/**
* Read stream data in range by specified start Stream ID (included) and end Stream ID (included).
*
* @param count - stream data size limit
* @param startId - start Stream ID
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
*/
Publisher<Map<StreamId, Map<K, V>>> range(int count, StreamId startId, StreamId endId);
/**
* Read stream data in reverse order in range by specified start Stream ID (included) and end Stream ID (included).
*
* @param startId - start Stream ID
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
*/
Publisher<Map<StreamId, Map<K, V>>> rangeReversed(StreamId startId, StreamId endId);
/**
* Read stream data in reverse order in range by specified start Stream ID (included) and end Stream ID (included).
*
* @param count - stream data size limit
* @param startId - start Stream ID
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
*/
Publisher<Map<StreamId, Map<K, V>>> rangeReversed(int count, StreamId startId, StreamId endId);
}

@ -30,6 +30,32 @@ import org.redisson.config.Config;
*/
public interface RedissonReactiveClient {
/**
* Returns stream instance by <code>name</code>
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
* @param <K> type of key
* @param <V> type of value
* @param name of stream
* @return RStream object
*/
<K, V> RStreamReactive<K, V> getStream(String name);
/**
* Returns stream instance by <code>name</code>
* using provided <code>codec</code> for entries.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
* @param <K> type of key
* @param <V> type of value
* @param name - name of stream
* @param codec - codec for entry
* @return RStream object
*/
<K, V> RStreamReactive<K, V> getStream(String name, Codec codec);
/**
* Returns geospatial items holder instance by <code>name</code>.
*

@ -59,6 +59,8 @@ import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapJoinDecoder;
import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder;
import org.redisson.client.protocol.decoder.PendingEntryDecoder;
import org.redisson.client.protocol.decoder.PendingResultDecoder;
import org.redisson.client.protocol.decoder.ScoredSortedSetPolledObjectDecoder;
import org.redisson.client.protocol.decoder.ScoredSortedSetReplayDecoder;
import org.redisson.client.protocol.decoder.ScoredSortedSetScanDecoder;
@ -329,7 +331,7 @@ public interface RedisCommands {
new ListMultiDecoder(new ListResultReplayDecoder(), new ObjectMapReplayDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET_1, new StreamIdConvertor()),
new ObjectMapJoinDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX)));
RedisCommand<Map<StreamId, Map<Object, Object>>> XREAD_BLOCKING = new RedisCommand<Map<StreamId, Map<Object, Object>>>("XREAD",
RedisCommand<Map<String, Map<StreamId, Map<Object, Object>>>> XREAD_BLOCKING = new RedisCommand<Map<String, Map<StreamId, Map<Object, Object>>>>("XREAD",
new ListMultiDecoder(new ListResultReplayDecoder(), new ObjectMapReplayDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET_1, new StreamIdConvertor()),
new ObjectMapJoinDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX)));
@ -340,10 +342,37 @@ public interface RedisCommands {
RedisCommand<Map<StreamId, Map<Object, Object>>> XREAD_BLOCKING_SINGLE = new RedisCommand<Map<StreamId, Map<Object, Object>>>("XREAD",
new ListMultiDecoder(new ListResultReplayDecoder(), new ObjectMapReplayDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET_1, new StreamIdConvertor()),
new ObjectMapJoinDecoder(), new ObjectMapReplayDecoder(), new StreamResultDecoder()));
RedisCommand<Map<String, Map<StreamId, Map<Object, Object>>>> XREADGROUP = new RedisCommand<Map<String, Map<StreamId, Map<Object, Object>>>>("XREADGROUP",
new ListMultiDecoder(new ListResultReplayDecoder(), new ObjectMapReplayDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET_1, new StreamIdConvertor()),
new ObjectMapJoinDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX)));
RedisCommand<Map<StreamId, Map<Object, Object>>> XREADGROUP_BLOCKING = new RedisCommand<Map<StreamId, Map<Object, Object>>>("XREADGROUP",
new ListMultiDecoder(new ListResultReplayDecoder(), new ObjectMapReplayDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET_1, new StreamIdConvertor()),
new ObjectMapJoinDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX)));
RedisCommand<Map<StreamId, Map<Object, Object>>> XREADGROUP_SINGLE = new RedisCommand<Map<StreamId, Map<Object, Object>>>("XREADGROUP",
new ListMultiDecoder(new ListResultReplayDecoder(), new ObjectMapReplayDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET_1, new StreamIdConvertor()),
new ObjectMapJoinDecoder(), new ObjectMapReplayDecoder(), new StreamResultDecoder()));
RedisCommand<Map<String, Map<StreamId, Map<Object, Object>>>> XCLAIM = new RedisCommand<Map<String, Map<StreamId, Map<Object, Object>>>>("XCLAIM",
new ListMultiDecoder(new ObjectMapReplayDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET, new StreamIdConvertor()),
new ObjectMapJoinDecoder()));
RedisCommand<Map<StreamId, Map<Object, Object>>> XREADGROUP_BLOCKING_SINGLE = new RedisCommand<Map<StreamId, Map<Object, Object>>>("XREADGROUP",
new ListMultiDecoder(new ListResultReplayDecoder(), new ObjectMapReplayDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET_1, new StreamIdConvertor()),
new ObjectMapJoinDecoder(), new ObjectMapReplayDecoder(), new StreamResultDecoder()));
RedisStrictCommand<StreamId> XADD = new RedisStrictCommand<StreamId>("XADD", new StreamIdConvertor());
RedisStrictCommand<Void> XGROUP = new RedisStrictCommand<Void>("XGROUP", new VoidReplayConvertor());
RedisStrictCommand<Void> XADD_VOID = new RedisStrictCommand<Void>("XADD", new VoidReplayConvertor());
RedisStrictCommand<Long> XLEN = new RedisStrictCommand<Long>("XLEN");
RedisStrictCommand<Long> XACK = new RedisStrictCommand<Long>("XACK");
RedisCommand<Object> XPENDING = new RedisCommand<Object>("XPENDING",
new ListMultiDecoder(new ObjectListReplayDecoder(), new ObjectListReplayDecoder(ListMultiDecoder.RESET), new PendingResultDecoder()));
RedisCommand<Object> XPENDING_ENTRIES = new RedisCommand<Object>("XPENDING",
new PendingEntryDecoder());
RedisStrictCommand<Long> TOUCH_LONG = new RedisStrictCommand<Long>("TOUCH", new LongReplayConvertor());
RedisStrictCommand<Boolean> TOUCH = new RedisStrictCommand<Boolean>("TOUCH", new BooleanReplayConvertor());

@ -0,0 +1,48 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import java.util.List;
import org.redisson.api.PendingEntry;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.convertor.StreamIdConvertor;
/**
*
* @author Nikita Koksharov
*
*/
public class PendingEntryDecoder implements MultiDecoder<Object> {
private final StreamIdConvertor convertor = new StreamIdConvertor();
@Override
public Decoder<Object> getDecoder(int paramNum, State state) {
return null;
}
@Override
public Object decode(List<Object> parts, State state) {
if (parts.isEmpty() || parts.get(0) instanceof PendingEntry) {
return parts;
}
return new PendingEntry(convertor.convert(parts.get(0)), parts.get(1).toString(),
Long.valueOf(parts.get(2).toString()), Long.valueOf(parts.get(3).toString()));
}
}

@ -0,0 +1,51 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.redisson.api.PendingResult;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.convertor.StreamIdConvertor;
/**
*
* @author Nikita Koksharov
*
*/
public class PendingResultDecoder implements MultiDecoder<Object> {
private final StreamIdConvertor convertor = new StreamIdConvertor();
@Override
public Decoder<Object> getDecoder(int paramNum, State state) {
return null;
}
@Override
public Object decode(List<Object> parts, State state) {
Map<String, Long> consumerNames = new LinkedHashMap<String, Long>();
List<List<String>> customerParts = (List<List<String>>) parts.get(3);
for (List<String> mapping : customerParts) {
consumerNames.put(mapping.get(0), Long.valueOf(mapping.get(1)));
}
return new PendingResult((long)parts.get(0), convertor.convert(parts.get(1)), convertor.convert(parts.get(2)), consumerNames);
}
}

@ -0,0 +1,471 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.reactive;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.redisson.RedissonStream;
import org.redisson.api.PendingEntry;
import org.redisson.api.PendingResult;
import org.redisson.api.RFuture;
import org.redisson.api.RStream;
import org.redisson.api.RStreamAsync;
import org.redisson.api.RStreamReactive;
import org.redisson.api.StreamId;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandReactiveExecutor;
import reactor.fn.Supplier;
/**
*
* @author Nikita Koksharov
*
* @param <K> type of key
* @param <V> type of value
*/
public class RedissonStreamReactive<K, V> extends RedissonExpirableReactive implements RStreamReactive<K, V> {
RStreamAsync<K, V> instance;
public RedissonStreamReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name, new RedissonStream<K, V>(commandExecutor, name));
this.instance = (RStream<K, V>) super.instance;
}
public RedissonStreamReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) {
super(codec, commandExecutor, name, new RedissonStream<K, V>(codec, commandExecutor, name));
this.instance = (RStream<K, V>) super.instance;
}
@Override
public Publisher<Void> createGroup(final String groupName) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.createGroupAsync(groupName);
}
});
}
@Override
public Publisher<Void> createGroup(final String groupName, final StreamId id) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.createGroupAsync(groupName, id);
}
});
}
@Override
public Publisher<Long> ack(final String groupName, final StreamId... ids) {
return reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.ackAsync(groupName, ids);
}
});
}
@Override
public Publisher<PendingResult> listPending(final String groupName) {
return reactive(new Supplier<RFuture<PendingResult>>() {
@Override
public RFuture<PendingResult> get() {
return instance.listPendingAsync(groupName);
}
});
}
@Override
public Publisher<List<PendingEntry>> listPending(final String groupName, final StreamId startId, final StreamId endId, final int count) {
return reactive(new Supplier<RFuture<List<PendingEntry>>>() {
@Override
public RFuture<List<PendingEntry>> get() {
return instance.listPendingAsync(groupName, startId, endId, count);
}
});
}
@Override
public Publisher<List<PendingEntry>> listPending(final String groupName, final StreamId startId, final StreamId endId, final int count,
final String consumerName) {
return reactive(new Supplier<RFuture<List<PendingEntry>>>() {
@Override
public RFuture<List<PendingEntry>> get() {
return instance.listPendingAsync(groupName, startId, endId, count, consumerName);
}
});
}
@Override
public Publisher<Map<StreamId, Map<K, V>>> claim(final String groupName, final String consumerName, final long idleTime,
final TimeUnit idleTimeUnit, final StreamId... ids) {
return reactive(new Supplier<RFuture<Map<StreamId, Map<K, V>>>>() {
@Override
public RFuture<Map<StreamId, Map<K, V>>> get() {
return instance.claimAsync(groupName, consumerName, idleTime, idleTimeUnit, ids);
}
});
}
@Override
public Publisher<Map<StreamId, Map<K, V>>> readGroup(final String groupName, final String consumerName, final StreamId... ids) {
return reactive(new Supplier<RFuture<Map<StreamId, Map<K, V>>>>() {
@Override
public RFuture<Map<StreamId, Map<K, V>>> get() {
return instance.readGroupAsync(groupName, consumerName, ids);
}
});
}
@Override
public Publisher<Map<StreamId, Map<K, V>>> readGroup(final String groupName, final String consumerName, final int count,
final StreamId... ids) {
return reactive(new Supplier<RFuture<Map<StreamId, Map<K, V>>>>() {
@Override
public RFuture<Map<StreamId, Map<K, V>>> get() {
return instance.readGroupAsync(groupName, consumerName, count, ids);
}
});
}
@Override
public Publisher<Map<StreamId, Map<K, V>>> readGroup(final String groupName, final String consumerName, final long timeout,
final TimeUnit unit, final StreamId... ids) {
return reactive(new Supplier<RFuture<Map<StreamId, Map<K, V>>>>() {
@Override
public RFuture<Map<StreamId, Map<K, V>>> get() {
return instance.readGroupAsync(groupName, consumerName, timeout, unit, ids);
}
});
}
@Override
public Publisher<Map<StreamId, Map<K, V>>> readGroup(final String groupName, final String consumerName, final int count, final long timeout,
final TimeUnit unit, final StreamId... ids) {
return reactive(new Supplier<RFuture<Map<StreamId, Map<K, V>>>>() {
@Override
public RFuture<Map<StreamId, Map<K, V>>> get() {
return instance.readGroupAsync(groupName, consumerName, timeout, unit, ids);
}
});
}
@Override
public Publisher<Long> size() {
return reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.sizeAsync();
}
});
}
@Override
public Publisher<StreamId> add(final K key, final V value) {
return reactive(new Supplier<RFuture<StreamId>>() {
@Override
public RFuture<StreamId> get() {
return instance.addAsync(key, value);
}
});
}
@Override
public Publisher<Void> add(final StreamId id, final K key, final V value) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.addAsync(id, key, value);
}
});
}
@Override
public Publisher<StreamId> add(final K key, final V value, final int trimLen, final boolean trimStrict) {
return reactive(new Supplier<RFuture<StreamId>>() {
@Override
public RFuture<StreamId> get() {
return instance.addAsync(key, value, trimLen, trimStrict);
}
});
}
@Override
public Publisher<Void> add(final StreamId id, final K key, final V value, final int trimLen, final boolean trimStrict) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.addAsync(id, key, value, trimLen, trimStrict);
}
});
}
@Override
public Publisher<StreamId> addAll(final Map<K, V> entries) {
return reactive(new Supplier<RFuture<StreamId>>() {
@Override
public RFuture<StreamId> get() {
return instance.addAllAsync(entries);
}
});
}
@Override
public Publisher<Void> addAll(final StreamId id, final Map<K, V> entries) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.addAllAsync(id, entries);
}
});
}
@Override
public Publisher<StreamId> addAll(final Map<K, V> entries, final int trimLen, final boolean trimStrict) {
return reactive(new Supplier<RFuture<StreamId>>() {
@Override
public RFuture<StreamId> get() {
return instance.addAllAsync(entries, trimLen, trimStrict);
}
});
}
@Override
public Publisher<Void> addAll(final StreamId id, final Map<K, V> entries, final int trimLen, final boolean trimStrict) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.addAllAsync(id, entries, trimLen, trimStrict);
}
});
}
@Override
public Publisher<Map<StreamId, Map<K, V>>> read(final StreamId... ids) {
return reactive(new Supplier<RFuture<Map<StreamId, Map<K, V>>>>() {
@Override
public RFuture<Map<StreamId, Map<K, V>>> get() {
return instance.readAsync(ids);
}
});
}
@Override
public Publisher<Map<StreamId, Map<K, V>>> read(final int count, final StreamId... ids) {
return reactive(new Supplier<RFuture<Map<StreamId, Map<K, V>>>>() {
@Override
public RFuture<Map<StreamId, Map<K, V>>> get() {
return instance.readAsync(count, ids);
}
});
}
@Override
public Publisher<Map<StreamId, Map<K, V>>> read(final long timeout, final TimeUnit unit, final StreamId... ids) {
return reactive(new Supplier<RFuture<Map<StreamId, Map<K, V>>>>() {
@Override
public RFuture<Map<StreamId, Map<K, V>>> get() {
return instance.readAsync(timeout, unit, ids);
}
});
}
@Override
public Publisher<Map<StreamId, Map<K, V>>> read(final int count, final long timeout, final TimeUnit unit, final StreamId... ids) {
return reactive(new Supplier<RFuture<Map<StreamId, Map<K, V>>>>() {
@Override
public RFuture<Map<StreamId, Map<K, V>>> get() {
return instance.readAsync(count, timeout, unit, ids);
}
});
}
@Override
public Publisher<Map<String, Map<StreamId, Map<K, V>>>> read(final StreamId id, final String name2, final StreamId id2) {
return reactive(new Supplier<RFuture<Map<String, Map<StreamId, Map<K, V>>>>>() {
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> get() {
return instance.readAsync(id, name2, id2);
}
});
}
@Override
public Publisher<Map<String, Map<StreamId, Map<K, V>>>> read(final StreamId id, final String name2, final StreamId id2, final String name3,
final StreamId id3) {
return reactive(new Supplier<RFuture<Map<String, Map<StreamId, Map<K, V>>>>>() {
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> get() {
return instance.readAsync(id, name2, id2, name3, id3);
}
});
}
@Override
public Publisher<Map<String, Map<StreamId, Map<K, V>>>> read(final StreamId id, final Map<String, StreamId> nameToId) {
return reactive(new Supplier<RFuture<Map<String, Map<StreamId, Map<K, V>>>>>() {
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> get() {
return instance.readAsync(id, nameToId);
}
});
}
@Override
public Publisher<Map<String, Map<StreamId, Map<K, V>>>> read(final int count, final StreamId id, final String name2, final StreamId id2) {
return reactive(new Supplier<RFuture<Map<String, Map<StreamId, Map<K, V>>>>>() {
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> get() {
return instance.readAsync(count, id, name2, id2);
}
});
}
@Override
public Publisher<Map<String, Map<StreamId, Map<K, V>>>> read(final int count, final StreamId id, final String name2, final StreamId id2,
final String name3, final StreamId id3) {
return reactive(new Supplier<RFuture<Map<String, Map<StreamId, Map<K, V>>>>>() {
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> get() {
return instance.readAsync(count, id, name2, id2, name3, id3);
}
});
}
@Override
public Publisher<Map<String, Map<StreamId, Map<K, V>>>> read(final int count, final StreamId id,
final Map<String, StreamId> nameToId) {
return reactive(new Supplier<RFuture<Map<String, Map<StreamId, Map<K, V>>>>>() {
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> get() {
return instance.readAsync(count, id, nameToId);
}
});
}
@Override
public Publisher<Map<String, Map<StreamId, Map<K, V>>>> read(final long timeout, final TimeUnit unit, final StreamId id, final String name2,
final StreamId id2) {
return reactive(new Supplier<RFuture<Map<String, Map<StreamId, Map<K, V>>>>>() {
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> get() {
return instance.readAsync(timeout, unit, id, name2, id2);
}
});
}
@Override
public Publisher<Map<String, Map<StreamId, Map<K, V>>>> read(final long timeout, final TimeUnit unit, final StreamId id, final String name2,
final StreamId id2, final String name3, final StreamId id3) {
return reactive(new Supplier<RFuture<Map<String, Map<StreamId, Map<K, V>>>>>() {
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> get() {
return instance.readAsync(timeout, unit, id, name2, id2, name3, id3);
}
});
}
@Override
public Publisher<Map<String, Map<StreamId, Map<K, V>>>> read(final long timeout, final TimeUnit unit, final StreamId id,
final Map<String, StreamId> nameToId) {
return reactive(new Supplier<RFuture<Map<String, Map<StreamId, Map<K, V>>>>>() {
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> get() {
return instance.readAsync(timeout, unit, id, nameToId);
}
});
}
@Override
public Publisher<Map<String, Map<StreamId, Map<K, V>>>> read(final int count, final long timeout, final TimeUnit unit, final StreamId id,
final String name2, final StreamId id2) {
return reactive(new Supplier<RFuture<Map<String, Map<StreamId, Map<K, V>>>>>() {
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> get() {
return instance.readAsync(count, timeout, unit, id, name2, id2);
}
});
}
@Override
public Publisher<Map<String, Map<StreamId, Map<K, V>>>> read(final int count, final long timeout, final TimeUnit unit, final StreamId id,
final String name2, final StreamId id2, final String name3, final StreamId id3) {
return reactive(new Supplier<RFuture<Map<String, Map<StreamId, Map<K, V>>>>>() {
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> get() {
return instance.readAsync(count, timeout, unit, id, name2, id2, name3, id3);
}
});
}
@Override
public Publisher<Map<String, Map<StreamId, Map<K, V>>>> read(final int count, final long timeout, final TimeUnit unit, final StreamId id,
final Map<String, StreamId> nameToId) {
return reactive(new Supplier<RFuture<Map<String, Map<StreamId, Map<K, V>>>>>() {
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> get() {
return instance.readAsync(count, timeout, unit, id, nameToId);
}
});
}
@Override
public Publisher<Map<StreamId, Map<K, V>>> range(final StreamId startId, final StreamId endId) {
return reactive(new Supplier<RFuture<Map<StreamId, Map<K, V>>>>() {
@Override
public RFuture<Map<StreamId, Map<K, V>>> get() {
return instance.rangeAsync(startId, endId);
}
});
}
@Override
public Publisher<Map<StreamId, Map<K, V>>> range(final int count, final StreamId startId, final StreamId endId) {
return reactive(new Supplier<RFuture<Map<StreamId, Map<K, V>>>>() {
@Override
public RFuture<Map<StreamId, Map<K, V>>> get() {
return instance.rangeAsync(count, startId, endId);
}
});
}
@Override
public Publisher<Map<StreamId, Map<K, V>>> rangeReversed(final StreamId startId, final StreamId endId) {
return reactive(new Supplier<RFuture<Map<StreamId, Map<K, V>>>>() {
@Override
public RFuture<Map<StreamId, Map<K, V>>> get() {
return instance.rangeReversedAsync(startId, endId);
}
});
}
@Override
public Publisher<Map<StreamId, Map<K, V>>> rangeReversed(final int count, final StreamId startId, final StreamId endId) {
return reactive(new Supplier<RFuture<Map<StreamId, Map<K, V>>>>() {
@Override
public RFuture<Map<StreamId, Map<K, V>>> get() {
return instance.rangeReversedAsync(startId, endId);
}
});
}
}

@ -4,15 +4,138 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.redisson.api.PendingEntry;
import org.redisson.api.PendingResult;
import org.redisson.api.RStream;
import org.redisson.api.StreamId;
public class RedissonStreamTest extends BaseTest {
@Test
public void testClaim() {
RStream<String, String> stream = redisson.getStream("test");
stream.add("0", "0");
stream.createGroup("testGroup");
StreamId id1 = stream.add("1", "1");
StreamId id2 = stream.add("2", "2");
Map<StreamId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
assertThat(s.size()).isEqualTo(2);
StreamId id3 = stream.add("3", "33");
StreamId id4 = stream.add("4", "44");
Map<StreamId, Map<String, String>> s2 = stream.readGroup("testGroup", "consumer2");
assertThat(s2.size()).isEqualTo(2);
Map<StreamId, Map<String, String>> res = stream.claimPending("testGroup", "consumer1", 1, TimeUnit.MILLISECONDS, id3, id4);
assertThat(res.size()).isEqualTo(2);
assertThat(res.keySet()).containsExactly(id3, id4);
for (Map<String, String> map : res.values()) {
assertThat(map.keySet()).containsAnyOf("3", "4");
assertThat(map.values()).containsAnyOf("33", "44");
}
}
@Test
public void testPending() {
RStream<String, String> stream = redisson.getStream("test");
stream.add("0", "0");
stream.createGroup("testGroup");
StreamId id1 = stream.add("1", "1");
StreamId id2 = stream.add("2", "2");
Map<StreamId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
assertThat(s.size()).isEqualTo(2);
StreamId id3 = stream.add("3", "3");
StreamId id4 = stream.add("4", "4");
Map<StreamId, Map<String, String>> s2 = stream.readGroup("testGroup", "consumer2");
assertThat(s2.size()).isEqualTo(2);
PendingResult pi = stream.listPending("testGroup");
assertThat(pi.getLowestId()).isEqualTo(id1);
assertThat(pi.getHighestId()).isEqualTo(id4);
assertThat(pi.getTotal()).isEqualTo(4);
assertThat(pi.getConsumerNames().keySet()).containsExactly("consumer1", "consumer2");
List<PendingEntry> list = stream.listPending("testGroup", StreamId.MIN, StreamId.MAX, 10);
assertThat(list.size()).isEqualTo(4);
for (PendingEntry pendingEntry : list) {
assertThat(pendingEntry.getId()).isIn(id1, id2, id3, id4);
assertThat(pendingEntry.getConsumerName()).isIn("consumer1", "consumer2");
assertThat(pendingEntry.getLastTimeDelivered()).isOne();
}
List<PendingEntry> list2 = stream.listPending("testGroup", StreamId.MIN, StreamId.MAX, 10, "consumer1");
assertThat(list2.size()).isEqualTo(2);
for (PendingEntry pendingEntry : list2) {
assertThat(pendingEntry.getId()).isIn(id1, id2);
assertThat(pendingEntry.getConsumerName()).isEqualTo("consumer1");
assertThat(pendingEntry.getLastTimeDelivered()).isOne();
}
}
@Test
public void testAck() {
RStream<String, String> stream = redisson.getStream("test");
stream.add("0", "0");
stream.createGroup("testGroup");
StreamId id1 = stream.add("1", "1");
StreamId id2 = stream.add("2", "2");
Map<StreamId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
assertThat(s.size()).isEqualTo(2);
assertThat(stream.ack("testGroup", id1, id2)).isEqualTo(2);
}
@Test
public void testReadGroup() {
RStream<String, String> stream = redisson.getStream("test");
StreamId id0 = stream.add("0", "0");
stream.createGroup("testGroup", id0);
stream.add("1", "1");
stream.add("2", "2");
stream.add("3", "3");
Map<StreamId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
assertThat(s.values().iterator().next().keySet()).containsAnyOf("1", "2", "3");
assertThat(s.size()).isEqualTo(3);
stream.add("1", "1");
stream.add("2", "2");
stream.add("3", "3");
Map<StreamId, Map<String, String>> s1 = stream.readGroup("testGroup", "consumer1", 1);
assertThat(s1.size()).isEqualTo(1);
StreamId id = stream.add("1", "1");
stream.add("2", "2");
stream.add("3", "3");
Map<StreamId, Map<String, String>> s2 = stream.readGroup("testGroup", "consumer1", id);
assertThat(s2.size()).isEqualTo(2);
}
@Test
public void testRangeReversed() {
RStream<String, String> stream = redisson.getStream("test");

Loading…
Cancel
Save