From 7c532057f9a9161cc089877e001239514db53efe Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 13 Sep 2018 18:24:00 +0300 Subject: [PATCH] Redis Streams XGROUP, XPENDING, XACK, XCLAIM commands support added. #1490 --- .../java/org/redisson/RedissonReactive.java | 12 + .../java/org/redisson/RedissonStream.java | 165 ++++++ .../java/org/redisson/api/PendingEntry.java | 76 +++ .../java/org/redisson/api/PendingResult.java | 83 +++ .../main/java/org/redisson/api/RStream.java | 131 ++++- .../java/org/redisson/api/RStreamAsync.java | 132 ++++- .../org/redisson/api/RStreamReactive.java | 478 ++++++++++++++++++ .../redisson/api/RedissonReactiveClient.java | 26 + .../client/protocol/RedisCommands.java | 33 +- .../protocol/decoder/PendingEntryDecoder.java | 48 ++ .../decoder/PendingResultDecoder.java | 51 ++ .../reactive/RedissonStreamReactive.java | 471 +++++++++++++++++ .../java/org/redisson/RedissonStreamTest.java | 123 +++++ 13 files changed, 1824 insertions(+), 5 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/api/PendingEntry.java create mode 100644 redisson/src/main/java/org/redisson/api/PendingResult.java create mode 100644 redisson/src/main/java/org/redisson/api/RStreamReactive.java create mode 100644 redisson/src/main/java/org/redisson/client/protocol/decoder/PendingEntryDecoder.java create mode 100644 redisson/src/main/java/org/redisson/client/protocol/decoder/PendingResultDecoder.java create mode 100644 redisson/src/main/java/org/redisson/reactive/RedissonStreamReactive.java diff --git a/redisson/src/main/java/org/redisson/RedissonReactive.java b/redisson/src/main/java/org/redisson/RedissonReactive.java index 3172ffd61..040ebc6ea 100644 --- a/redisson/src/main/java/org/redisson/RedissonReactive.java +++ b/redisson/src/main/java/org/redisson/RedissonReactive.java @@ -52,6 +52,7 @@ import org.redisson.api.RSemaphoreReactive; import org.redisson.api.RSetCacheReactive; import org.redisson.api.RSetMultimapReactive; import org.redisson.api.RSetReactive; +import org.redisson.api.RStreamReactive; import org.redisson.api.RTopicReactive; import org.redisson.api.RTransactionReactive; import org.redisson.api.RedissonReactiveClient; @@ -92,6 +93,7 @@ import org.redisson.reactive.RedissonSemaphoreReactive; import org.redisson.reactive.RedissonSetCacheReactive; import org.redisson.reactive.RedissonSetMultimapReactive; import org.redisson.reactive.RedissonSetReactive; +import org.redisson.reactive.RedissonStreamReactive; import org.redisson.reactive.RedissonTopicReactive; import org.redisson.reactive.RedissonTransactionReactive; @@ -121,6 +123,16 @@ public class RedissonReactive implements RedissonReactiveClient { evictionScheduler = new EvictionScheduler(commandExecutor); codecProvider = config.getReferenceCodecProvider(); } + + @Override + public RStreamReactive getStream(String name) { + return new RedissonStreamReactive(commandExecutor, name); + } + + @Override + public RStreamReactive getStream(String name, Codec codec) { + return new RedissonStreamReactive(codec, commandExecutor, name); + } @Override public RGeoReactive getGeo(String name) { diff --git a/redisson/src/main/java/org/redisson/RedissonStream.java b/redisson/src/main/java/org/redisson/RedissonStream.java index a1d963e64..2c5e26d56 100644 --- a/redisson/src/main/java/org/redisson/RedissonStream.java +++ b/redisson/src/main/java/org/redisson/RedissonStream.java @@ -23,6 +23,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.redisson.api.PendingEntry; +import org.redisson.api.PendingResult; import org.redisson.api.RFuture; import org.redisson.api.RStream; import org.redisson.api.StreamId; @@ -59,6 +61,169 @@ public class RedissonStream extends RedissonExpirable implements RStream createGroupAsync(String groupName) { + return createGroupAsync(groupName, StreamId.NEWEST); + } + + @Override + public void createGroup(String groupName, StreamId id) { + get(createGroupAsync(groupName, id)); + } + + @Override + public RFuture createGroupAsync(String groupName, StreamId id) { + return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XGROUP, "CREATE", getName(), groupName, id); + } + + @Override + public RFuture ackAsync(String groupName, StreamId... ids) { + List params = new ArrayList(); + params.add(getName()); + params.add(groupName); + for (StreamId id : ids) { + params.add(id); + } + + return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XACK, params.toArray()); + } + + @Override + public Long ack(String groupName, StreamId... id) { + return get(ackAsync(groupName, id)); + } + + @Override + public RFuture listPendingAsync(String groupName) { + return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.XPENDING, getName(), groupName); + } + + @Override + public PendingResult listPending(String groupName) { + return get(listPendingAsync(groupName)); + } + + @Override + public RFuture> listPendingAsync(String groupName, StreamId startId, StreamId endId, int count, + String consumerName) { + return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.XPENDING_ENTRIES, getName(), groupName, startId, endId, count, consumerName); + } + + @Override + public RFuture> listPendingAsync(String groupName, StreamId startId, StreamId endId, int count) { + return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.XPENDING_ENTRIES, getName(), groupName, startId, endId, count); + } + + @Override + public List listPending(String groupName, StreamId startId, StreamId endId, int count) { + return get(listPendingAsync(groupName, startId, endId, count)); + } + + @Override + public List listPending(String groupName, StreamId startId, StreamId endId, int count, + String consumerName) { + return get(listPendingAsync(groupName, startId, endId, count, consumerName)); + } + + @Override + public RFuture>> claimAsync(String groupName, String consumerName, long idleTime, + TimeUnit idleTimeUnit, StreamId... ids) { + List params = new ArrayList(); + params.add(getName()); + params.add(groupName); + params.add(consumerName); + params.add(idleTimeUnit.toMillis(idleTime)); + + for (StreamId id : ids) { + params.add(id.toString()); + } + + return commandExecutor.readAsync(getName(), codec, RedisCommands.XCLAIM, params.toArray()); + } + + @Override + public Map> claimPending(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, + StreamId... ids) { + return get(claimAsync(groupName, consumerName, idleTime, idleTimeUnit, ids)); + } + + @Override + public RFuture>> readGroupAsync(String groupName, String consumerName, StreamId... ids) { + return readGroupAsync(groupName, consumerName, 0, ids); + } + + @Override + public RFuture>> readGroupAsync(String groupName, String consumerName, int count, StreamId... ids) { + return readGroupAsync(groupName, consumerName, count, 0, null, ids); + } + + @Override + public RFuture>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit, + StreamId... ids) { + return readGroupAsync(groupName, consumerName, 0, timeout, unit, ids); + } + + @Override + public RFuture>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit, + StreamId... ids) { + List params = new ArrayList(); + params.add("GROUP"); + params.add(groupName); + params.add(consumerName); + + if (count > 0) { + params.add("COUNT"); + params.add(count); + } + + if (timeout > 0) { + params.add("BLOCK"); + params.add(toSeconds(timeout, unit)*1000); + } + + params.add("STREAMS"); + params.add(getName()); + + if (ids.length == 0) { + params.add(">"); + } + + for (StreamId id : ids) { + params.add(id.toString()); + } + + if (timeout > 0) { + return commandExecutor.readAsync(getName(), codec, RedisCommands.XREADGROUP_BLOCKING_SINGLE, params.toArray()); + } + return commandExecutor.readAsync(getName(), codec, RedisCommands.XREADGROUP_SINGLE, params.toArray()); + } + + @Override + public Map> readGroup(String groupName, String consumerName, StreamId... ids) { + return get(readGroupAsync(groupName, consumerName, ids)); + } + + @Override + public Map> readGroup(String groupName, String consumerName, int count, StreamId... ids) { + return get(readGroupAsync(groupName, consumerName, count, ids)); + } + + @Override + public Map> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamId... ids) { + return get(readGroupAsync(groupName, consumerName, timeout, unit, ids)); + } + + @Override + public Map> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, + StreamId... ids) { + return get(readGroupAsync(groupName, consumerName, count, timeout, unit, ids)); + } @Override public StreamId addAll(Map entries) { diff --git a/redisson/src/main/java/org/redisson/api/PendingEntry.java b/redisson/src/main/java/org/redisson/api/PendingEntry.java new file mode 100644 index 000000000..16d2e268c --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/PendingEntry.java @@ -0,0 +1,76 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +/** + * Entry object for pending messages request. + * + * @author Nikita Koksharov + * + */ +public class PendingEntry { + + private StreamId id; + private String consumerName; + private long idleTime; + private long lastTimeDelivered; + + public PendingEntry(StreamId id, String consumerName, long idleTime, long lastTimeDelivered) { + super(); + this.id = id; + this.consumerName = consumerName; + this.idleTime = idleTime; + this.lastTimeDelivered = lastTimeDelivered; + } + + /** + * Returns stream id of message + * + * @return id + */ + public StreamId getId() { + return id; + } + + /** + * Returns name of consumer + * + * @return id + */ + public String getConsumerName() { + return consumerName; + } + + /** + * Returns milliseconds amount have passed since the last time + * the message was delivered to some consumer + * + * @return number + */ + public long getIdleTime() { + return idleTime; + } + + /** + * Returns number of times that a given message was delivered + * + * @return number + */ + public long getLastTimeDelivered() { + return lastTimeDelivered; + } + +} diff --git a/redisson/src/main/java/org/redisson/api/PendingResult.java b/redisson/src/main/java/org/redisson/api/PendingResult.java new file mode 100644 index 000000000..23f593bb6 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/PendingResult.java @@ -0,0 +1,83 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import java.io.Serializable; +import java.util.Map; + +/** + * Result object for pending messages request. + * + * @author Nikita Koksharov + * + */ +public class PendingResult implements Serializable { + + private static final long serialVersionUID = -5525031552305408248L; + + private long total; + private StreamId lowestId; + private StreamId highestId; + private Map consumerNames; + + public PendingResult() { + } + + public PendingResult(long total, StreamId lowestId, StreamId highestId, Map consumerNames) { + super(); + this.total = total; + this.lowestId = lowestId; + this.highestId = highestId; + this.consumerNames = consumerNames; + } + + /** + * Total amount of pending messages + * + * @return number + */ + public long getTotal() { + return total; + } + + /** + * Lowest stream id of pending messages + * + * @return number + */ + public StreamId getLowestId() { + return lowestId; + } + + /** + * Highest stream id of pending messages + * + * @return number + */ + public StreamId getHighestId() { + return highestId; + } + + /** + * Pending messages amount mapped by consumer name + * + * @return map + */ + public Map getConsumerNames() { + return consumerNames; + } + +} diff --git a/redisson/src/main/java/org/redisson/api/RStream.java b/redisson/src/main/java/org/redisson/api/RStream.java index db6452d6d..be47bdeb3 100644 --- a/redisson/src/main/java/org/redisson/api/RStream.java +++ b/redisson/src/main/java/org/redisson/api/RStream.java @@ -15,12 +15,12 @@ */ package org.redisson.api; -import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; /** - * Redis Stream implementation. + * Interface for Redis Stream object. *

* Requires Redis 5.0.0 and higher. * @@ -31,6 +31,133 @@ import java.util.concurrent.TimeUnit; */ public interface RStream extends RStreamAsync, RExpirable { + /** + * Creates consumer group by name. + * + * @param groupName - name of group + */ + void createGroup(String groupName); + + /** + * Creates consumer group by name and stream id. + * Only new messages after defined stream id will be available for consumers of this group. + *

+ * {@link StreamId#NEWEST} is used for messages arrived since the moment of group creating + * + * @param groupName - name of group + * @param id - stream id + */ + void createGroup(String groupName, StreamId id); + + /** + * Marks pending messages by group name and stream ids as correctly processed. + * + * @param groupName - name of group + * @param ids - stream ids + * @return marked messages amount + */ + Long ack(String groupName, StreamId... ids); + + /** + * Returns pending messages by group name + * + * @param groupName - name of group + * @return result object + */ + PendingResult listPending(String groupName); + + /** + * Returns list of pending messages by group name. + * Limited by start stream id and end stream id and count. + *

+ * {@link StreamId#MAX} is used as max stream id + * {@link StreamId#MIN} is used as min stream id + * + * @param groupName - name of group + * @param startId - start stream id + * @param endId - end stream id + * @param count - amount of messages + * @return list + */ + List listPending(String groupName, StreamId startId, StreamId endId, int count); + + /** + * Returns list of pending messages by group name and consumer name. + * Limited by start stream id and end stream id and count. + *

+ * {@link StreamId#MAX} is used as max stream id + * {@link StreamId#MIN} is used as min stream id + * + * @param consumerName - name of consumer + * @param groupName - name of group + * @param startId - start stream id + * @param endId - end stream id + * @param count - amount of messages + * @return list + */ + List listPending(String groupName, StreamId startId, StreamId endId, int count, String consumerName); + + /** + * Transfers ownership of pending messages by id to a new consumer + * by name if idle time of messages is greater than defined value. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @param idleTime - minimum idle time of messages + * @param idleTimeUnit - idle time unit + * @param ids - stream ids + * @return + */ + Map> claimPending(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamId ... ids); + + /** + * Read stream data from groupName by consumerName and specified collection of Stream IDs. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @param ids - collection of Stream IDs + * @return stream data mapped by Stream ID + */ + Map> readGroup(String groupName, String consumerName, StreamId ... ids); + + /** + * Read stream data from groupName by consumerName and specified collection of Stream IDs. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @param count - stream data size limit + * @param ids - collection of Stream IDs + * @return stream data mapped by Stream ID + */ + Map> readGroup(String groupName, String consumerName, int count, StreamId ... ids); + + /** + * Read stream data from groupName by consumerName and specified collection of Stream IDs. + * Wait for stream data availability for specified timeout interval. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param ids - collection of Stream IDs + * @return stream data mapped by Stream ID + */ + Map> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamId ... ids); + + /** + * Read stream data from groupName by consumerName and specified collection of Stream IDs. + * Wait for stream data availability for specified timeout interval. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @param count - stream data size limit + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param ids - collection of Stream IDs + * @return stream data mapped by Stream ID + */ + Map> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamId ... ids); + /** * Returns number of entries in stream * diff --git a/redisson/src/main/java/org/redisson/api/RStreamAsync.java b/redisson/src/main/java/org/redisson/api/RStreamAsync.java index 343a3dbb5..a095a3aa6 100644 --- a/redisson/src/main/java/org/redisson/api/RStreamAsync.java +++ b/redisson/src/main/java/org/redisson/api/RStreamAsync.java @@ -15,11 +15,12 @@ */ package org.redisson.api; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; /** - * Redis Stream implementation. + * Async interface for Redis Stream object. *

* Requires Redis 5.0.0 and higher. * @@ -30,6 +31,135 @@ import java.util.concurrent.TimeUnit; */ public interface RStreamAsync extends RExpirableAsync { + /** + * Creates consumer group by name. + * + * @param groupName - name of group + * @return void + */ + RFuture createGroupAsync(String groupName); + + /** + * Creates consumer group by name and stream id. + * Only new messages after defined stream id will be available for consumers of this group. + *

+ * {@link StreamId#NEWEST} is used for messages arrived since the moment of group creating + * + * @param groupName - name of group + * @param id - stream id + * @return void + */ + RFuture createGroupAsync(String groupName, StreamId id); + + /** + * Marks pending messages by group name and stream ids as correctly processed. + * + * @param groupName - name of group + * @param ids - stream ids + * @return marked messages amount + */ + RFuture ackAsync(String groupName, StreamId... ids); + + /** + * Returns pending messages by group name + * + * @param groupName - name of group + * @return result object + */ + RFuture listPendingAsync(String groupName); + + /** + * Returns list of pending messages by group name. + * Limited by start stream id and end stream id and count. + *

+ * {@link StreamId#MAX} is used as max stream id + * {@link StreamId#MIN} is used as min stream id + * + * @param groupName - name of group + * @param startId - start stream id + * @param endId - end stream id + * @param count - amount of messages + * @return list + */ + RFuture> listPendingAsync(String groupName, StreamId startId, StreamId endId, int count); + + /** + * Returns list of pending messages by group name and consumer name. + * Limited by start stream id and end stream id and count. + *

+ * {@link StreamId#MAX} is used as max stream id + * {@link StreamId#MIN} is used as min stream id + * + * @param consumerName - name of consumer + * @param groupName - name of group + * @param startId - start stream id + * @param endId - end stream id + * @param count - amount of messages + * @return list + */ + RFuture> listPendingAsync(String groupName, StreamId startId, StreamId endId, int count, String consumerName); + + /** + * Transfers ownership of pending messages by id to a new consumer + * by name if idle time of messages is greater than defined value. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @param idleTime - minimum idle time of messages + * @param idleTimeUnit - idle time unit + * @param ids - stream ids + * @return + */ + RFuture>> claimAsync(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamId ... ids); + + /** + * Read stream data from groupName by consumerName and specified collection of Stream IDs. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @param ids - collection of Stream IDs + * @return stream data mapped by Stream ID + */ + RFuture>> readGroupAsync(String groupName, String consumerName, StreamId ... ids); + + /** + * Read stream data from groupName by consumerName and specified collection of Stream IDs. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @param count - stream data size limit + * @param ids - collection of Stream IDs + * @return stream data mapped by Stream ID + */ + RFuture>> readGroupAsync(String groupName, String consumerName, int count, StreamId ... ids); + + /** + * Read stream data from groupName by consumerName and specified collection of Stream IDs. + * Wait for stream data availability for specified timeout interval. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param ids - collection of Stream IDs + * @return stream data mapped by Stream ID + */ + RFuture>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit, StreamId ... ids); + + /** + * Read stream data from groupName by consumerName and specified collection of Stream IDs. + * Wait for stream data availability for specified timeout interval. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @param count - stream data size limit + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param ids - collection of Stream IDs + * @return stream data mapped by Stream ID + */ + RFuture>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamId ... ids); + /** * Returns number of entries in stream * diff --git a/redisson/src/main/java/org/redisson/api/RStreamReactive.java b/redisson/src/main/java/org/redisson/api/RStreamReactive.java new file mode 100644 index 000000000..aa956b530 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RStreamReactive.java @@ -0,0 +1,478 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.reactivestreams.Publisher; + +/** + * Reactive interface for Redis Stream object. + *

+ * Requires Redis 5.0.0 and higher. + * + * @author Nikita Koksharov + * + * @param key type + * @param value type + */ +public interface RStreamReactive extends RExpirableReactive { + + /** + * Creates consumer group by name. + * + * @param groupName - name of group + * @return void + */ + Publisher createGroup(String groupName); + + /** + * Creates consumer group by name and stream id. + * Only new messages after defined stream id will be available for consumers of this group. + *

+ * {@link StreamId#NEWEST} is used for messages arrived since the moment of group creating + * + * @param groupName - name of group + * @return void + */ + Publisher createGroup(String groupName, StreamId id); + + /** + * Marks pending messages by group name and stream ids as correctly processed. + * + * @param groupName - name of group + * @param ids - stream ids + * @return marked messages amount + */ + Publisher ack(String groupName, StreamId... ids); + + /** + * Returns pending messages by group name + * + * @param groupName - name of group + * @return result object + */ + Publisher listPending(String groupName); + + /** + * Returns list of pending messages by group name. + * Limited by start stream id and end stream id and count. + *

+ * {@link StreamId#MAX} is used as max stream id + * {@link StreamId#MIN} is used as min stream id + * + * @param groupName - name of group + * @param startId - start stream id + * @param endId - end stream id + * @param count - amount of messages + * @return list + */ + Publisher> listPending(String groupName, StreamId startId, StreamId endId, int count); + + /** + * Returns list of pending messages by group name and consumer name. + * Limited by start stream id and end stream id and count. + *

+ * {@link StreamId#MAX} is used as max stream id + * {@link StreamId#MIN} is used as min stream id + * + * @param consumerName - name of consumer + * @param groupName - name of group + * @param startId - start stream id + * @param endId - end stream id + * @param count - amount of messages + * @return list + */ + Publisher> listPending(String groupName, StreamId startId, StreamId endId, int count, String consumerName); + + /** + * Transfers ownership of pending messages by id to a new consumer + * by name if idle time of messages is greater than defined value. + * + * @param groupName - name of group + * @param consumerName - name of consumer + * @param idleTime - minimum idle time of messages + * @param idleTimeUnit - idle time unit + * @param ids - stream ids + * @return + */ + Publisher>> claim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamId ... ids); + + /** + * Read stream data from groupName by consumerName and specified collection of Stream IDs. + * + * @param ids - collection of Stream IDs + * @return stream data mapped by Stream ID + */ + Publisher>> readGroup(String groupName, String consumerName, StreamId ... ids); + + /** + * Read stream data from groupName by consumerName and specified collection of Stream IDs. + * + * @param count - stream data size limit + * @param ids - collection of Stream IDs + * @return stream data mapped by Stream ID + */ + Publisher>> readGroup(String groupName, String consumerName, int count, StreamId ... ids); + + /** + * Read stream data from groupName by consumerName and specified collection of Stream IDs. + * Wait for stream data availability for specified timeout interval. + * + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param ids - collection of Stream IDs + * @return stream data mapped by Stream ID + */ + Publisher>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamId ... ids); + + /** + * Read stream data from groupName by consumerName and specified collection of Stream IDs. + * Wait for stream data availability for specified timeout interval. + * + * @param count - stream data size limit + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param ids - collection of Stream IDs + * @return stream data mapped by Stream ID + */ + Publisher>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamId ... ids); + + + /** + * Returns number of entries in stream + * + * @return size of stream + */ + Publisher size(); + + /** + * Appends a new entry and returns generated Stream ID + * + * @param key - key of entry + * @param value - value of entry + * @return Stream ID + */ + Publisher add(K key, V value); + + /** + * Appends a new entry by specified Stream ID + * + * @param id - Stream ID + * @param key - key of entry + * @param value - value of entry + * @return void + */ + Publisher add(StreamId id, K key, V value); + + /** + * Appends a new entry and returns generated Stream ID. + * Trims stream to a specified trimLen size. + * If trimStrict is false then trims to few tens of entries more than specified length to trim. + * + * @param key - key of entry + * @param value - value of entry + * @param trimLen - length to trim + * @param trimStrict - if false then trims to few tens of entries more than specified length to trim + * @return Stream ID + */ + Publisher add(K key, V value, int trimLen, boolean trimStrict); + + /** + * Appends a new entry by specified Stream ID. + * Trims stream to a specified trimLen size. + * If trimStrict is false then trims to few tens of entries more than specified length to trim. + * + * @param id - Stream ID + * @param key - key of entry + * @param value - value of entry + * @param trimLen - length to trim + * @param trimStrict - if false then trims to few tens of entries more than specified length to trim + * @return void + */ + Publisher add(StreamId id, K key, V value, int trimLen, boolean trimStrict); + + /** + * Appends new entries and returns generated Stream ID + * + * @param entries - entries to add + * @return Stream ID + */ + Publisher addAll(Map entries); + + /** + * Appends new entries by specified Stream ID + * + * @param id - Stream ID + * @param entries - entries to add + * @return void + */ + Publisher addAll(StreamId id, Map entries); + + /** + * Appends new entries and returns generated Stream ID. + * Trims stream to a specified trimLen size. + * If trimStrict is false then trims to few tens of entries more than specified length to trim. + * + * @param entries - entries to add + * @param trimLen - length to trim + * @param trimStrict - if false then trims to few tens of entries more than specified length to trim + * @return Stream ID + */ + Publisher addAll(Map entries, int trimLen, boolean trimStrict); + + /** + * Appends new entries by specified Stream ID. + * Trims stream to a specified trimLen size. + * If trimStrict is false then trims to few tens of entries more than specified length to trim. + * + * @param id - Stream ID + * @param entries - entries to add + * @param trimLen - length to trim + * @param trimStrict - if false then trims to few tens of entries more than specified length to trim + * @return void + */ + Publisher addAll(StreamId id, Map entries, int trimLen, boolean trimStrict); + + /** + * Read stream data by specified collection of Stream IDs. + * + * @param ids - collection of Stream IDs + * @return stream data mapped by Stream ID + */ + Publisher>> read(StreamId ... ids); + + /** + * Read stream data by specified collection of Stream IDs. + * + * @param count - stream data size limit + * @param ids - collection of Stream IDs + * @return stream data mapped by Stream ID + */ + Publisher>> read(int count, StreamId ... ids); + + /** + * Read stream data by specified collection of Stream IDs. + * Wait for stream data availability for specified timeout interval. + * + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param ids - collection of Stream IDs + * @return stream data mapped by Stream ID + */ + Publisher>> read(long timeout, TimeUnit unit, StreamId ... ids); + + /** + * Read stream data by specified collection of Stream IDs. + * Wait for stream data availability for specified timeout interval. + * + * @param count - stream data size limit + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param ids - collection of Stream IDs + * @return stream data mapped by Stream ID + */ + Publisher>> read(int count, long timeout, TimeUnit unit, StreamId ... ids); + + /** + * Read stream data by specified stream name including this stream. + * + * @param id - id of this stream + * @param name2 - name of second stream + * @param id2 - id of second stream + * @return stream data mapped by key and Stream ID + */ + Publisher>>> read(StreamId id, String name2, StreamId id2); + + /** + * Read stream data by specified stream names including this stream. + * + * @param id - id of this stream + * @param name2 - name of second stream + * @param id2 - id of second stream + * @param name3 - name of third stream + * @param id3 - id of third stream + * @return stream data mapped by key and Stream ID + */ + Publisher>>> read(StreamId id, String name2, StreamId id2, String name3, StreamId id3); + + /** + * Read stream data by specified stream id mapped by name including this stream. + * + * @param id - id of this stream + * @param nameToId - stream id mapped by name + * @return stream data mapped by key and Stream ID + */ + Publisher>>> read(StreamId id, Map nameToId); + + /** + * Read stream data by specified stream name including this stream. + * + * @param count - stream data size limit + * @param id - id of this stream + * @param name2 - name of second stream + * @param id2 - id of second stream + * @return stream data mapped by key and Stream ID + */ + Publisher>>> read(int count, StreamId id, String name2, StreamId id2); + + /** + * Read stream data by specified stream names including this stream. + * + * @param count - stream data size limit + * @param id - id of this stream + * @param name2 - name of second stream + * @param id2 - id of second stream + * @param name3 - name of third stream + * @param id3 - id of third stream + * @return stream data mapped by key and Stream ID + */ + Publisher>>> read(int count, StreamId id, String name2, StreamId id2, String name3, StreamId id3); + + /** + * Read stream data by specified stream id mapped by name including this stream. + * + * @param count - stream data size limit + * @param id - id of this stream + * @param nameToId - stream id mapped by name + * @return stream data mapped by key and Stream ID + */ + Publisher>>> read(int count, StreamId id, Map nameToId); + + /** + * Read stream data by specified stream name including this stream. + * Wait for the first stream data availability for specified timeout interval. + * + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param id - id of this stream + * @param name2 - name of second stream + * @param id2 - id of second stream + * @return stream data mapped by key and Stream ID + */ + Publisher>>> read(long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2); + + /** + * Read stream data by specified stream names including this stream. + * Wait for the first stream data availability for specified timeout interval. + * + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param id - id of this stream + * @param name2 - name of second stream + * @param id2 - id of second stream + * @param name3 - name of third stream + * @param id3 - id of third stream + * @return stream data mapped by key and Stream ID + */ + Publisher>>> read(long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2, String name3, StreamId id3); + + /** + * Read stream data by specified stream id mapped by name including this stream. + * Wait for the first stream data availability for specified timeout interval. + * + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param id - id of this stream + * @param nameToId - stream id mapped by name + * @return stream data mapped by key and Stream ID + */ + Publisher>>> read(long timeout, TimeUnit unit, StreamId id, Map nameToId); + + /** + * Read stream data by specified stream name including this stream. + * Wait for the first stream data availability for specified timeout interval. + * + * @param count - stream data size limit + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param id - id of this stream + * @param name2 - name of second stream + * @param id2 - id of second stream + * @return stream data mapped by key and Stream ID + */ + Publisher>>> read(int count, long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2); + + /** + * Read stream data by specified stream names including this stream. + * Wait for the first stream data availability for specified timeout interval. + * + * @param count - stream data size limit + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param id - id of this stream + * @param name2 - name of second stream + * @param id2 - id of second stream + * @param name3 - name of third stream + * @param id3 - id of third stream + * @return stream data mapped by key and Stream ID + */ + Publisher>>> read(int count, long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2, String name3, StreamId id3); + + /** + * Read stream data by specified stream id mapped by name including this stream. + * Wait for the first stream data availability for specified timeout interval. + * + * @param count - stream data size limit + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param id - id of this stream + * @param nameToId - stream id mapped by name + * @return stream data mapped by key and Stream ID + */ + Publisher>>> read(int count, long timeout, TimeUnit unit, StreamId id, Map nameToId); + + /** + * Read stream data in range by specified start Stream ID (included) and end Stream ID (included). + * + * @param startId - start Stream ID + * @param endId - end Stream ID + * @return stream data mapped by Stream ID + */ + Publisher>> range(StreamId startId, StreamId endId); + + /** + * Read stream data in range by specified start Stream ID (included) and end Stream ID (included). + * + * @param count - stream data size limit + * @param startId - start Stream ID + * @param endId - end Stream ID + * @return stream data mapped by Stream ID + */ + Publisher>> range(int count, StreamId startId, StreamId endId); + + /** + * Read stream data in reverse order in range by specified start Stream ID (included) and end Stream ID (included). + * + * @param startId - start Stream ID + * @param endId - end Stream ID + * @return stream data mapped by Stream ID + */ + Publisher>> rangeReversed(StreamId startId, StreamId endId); + + /** + * Read stream data in reverse order in range by specified start Stream ID (included) and end Stream ID (included). + * + * @param count - stream data size limit + * @param startId - start Stream ID + * @param endId - end Stream ID + * @return stream data mapped by Stream ID + */ + Publisher>> rangeReversed(int count, StreamId startId, StreamId endId); + +} diff --git a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java index 12d5709ea..b096cb3a3 100644 --- a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java +++ b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java @@ -30,6 +30,32 @@ import org.redisson.config.Config; */ public interface RedissonReactiveClient { + /** + * Returns stream instance by name + *

+ * Requires Redis 5.0.0 and higher. + * + * @param type of key + * @param type of value + * @param name of stream + * @return RStream object + */ + RStreamReactive getStream(String name); + + /** + * Returns stream instance by name + * using provided codec for entries. + *

+ * Requires Redis 5.0.0 and higher. + * + * @param type of key + * @param type of value + * @param name - name of stream + * @param codec - codec for entry + * @return RStream object + */ + RStreamReactive getStream(String name, Codec codec); + /** * Returns geospatial items holder instance by name. * diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index c83063fd5..10dfbbdf8 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -59,6 +59,8 @@ import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapJoinDecoder; import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder; import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder; +import org.redisson.client.protocol.decoder.PendingEntryDecoder; +import org.redisson.client.protocol.decoder.PendingResultDecoder; import org.redisson.client.protocol.decoder.ScoredSortedSetPolledObjectDecoder; import org.redisson.client.protocol.decoder.ScoredSortedSetReplayDecoder; import org.redisson.client.protocol.decoder.ScoredSortedSetScanDecoder; @@ -329,7 +331,7 @@ public interface RedisCommands { new ListMultiDecoder(new ListResultReplayDecoder(), new ObjectMapReplayDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET_1, new StreamIdConvertor()), new ObjectMapJoinDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX))); - RedisCommand>> XREAD_BLOCKING = new RedisCommand>>("XREAD", + RedisCommand>>> XREAD_BLOCKING = new RedisCommand>>>("XREAD", new ListMultiDecoder(new ListResultReplayDecoder(), new ObjectMapReplayDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET_1, new StreamIdConvertor()), new ObjectMapJoinDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX))); @@ -340,10 +342,37 @@ public interface RedisCommands { RedisCommand>> XREAD_BLOCKING_SINGLE = new RedisCommand>>("XREAD", new ListMultiDecoder(new ListResultReplayDecoder(), new ObjectMapReplayDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET_1, new StreamIdConvertor()), new ObjectMapJoinDecoder(), new ObjectMapReplayDecoder(), new StreamResultDecoder())); - + + RedisCommand>>> XREADGROUP = new RedisCommand>>>("XREADGROUP", + new ListMultiDecoder(new ListResultReplayDecoder(), new ObjectMapReplayDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET_1, new StreamIdConvertor()), + new ObjectMapJoinDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX))); + + RedisCommand>> XREADGROUP_BLOCKING = new RedisCommand>>("XREADGROUP", + new ListMultiDecoder(new ListResultReplayDecoder(), new ObjectMapReplayDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET_1, new StreamIdConvertor()), + new ObjectMapJoinDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX))); + + RedisCommand>> XREADGROUP_SINGLE = new RedisCommand>>("XREADGROUP", + new ListMultiDecoder(new ListResultReplayDecoder(), new ObjectMapReplayDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET_1, new StreamIdConvertor()), + new ObjectMapJoinDecoder(), new ObjectMapReplayDecoder(), new StreamResultDecoder())); + + RedisCommand>>> XCLAIM = new RedisCommand>>>("XCLAIM", + new ListMultiDecoder(new ObjectMapReplayDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET, new StreamIdConvertor()), + new ObjectMapJoinDecoder())); + + RedisCommand>> XREADGROUP_BLOCKING_SINGLE = new RedisCommand>>("XREADGROUP", + new ListMultiDecoder(new ListResultReplayDecoder(), new ObjectMapReplayDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET_1, new StreamIdConvertor()), + new ObjectMapJoinDecoder(), new ObjectMapReplayDecoder(), new StreamResultDecoder())); + + RedisStrictCommand XADD = new RedisStrictCommand("XADD", new StreamIdConvertor()); + RedisStrictCommand XGROUP = new RedisStrictCommand("XGROUP", new VoidReplayConvertor()); RedisStrictCommand XADD_VOID = new RedisStrictCommand("XADD", new VoidReplayConvertor()); RedisStrictCommand XLEN = new RedisStrictCommand("XLEN"); + RedisStrictCommand XACK = new RedisStrictCommand("XACK"); + RedisCommand XPENDING = new RedisCommand("XPENDING", + new ListMultiDecoder(new ObjectListReplayDecoder(), new ObjectListReplayDecoder(ListMultiDecoder.RESET), new PendingResultDecoder())); + RedisCommand XPENDING_ENTRIES = new RedisCommand("XPENDING", + new PendingEntryDecoder()); RedisStrictCommand TOUCH_LONG = new RedisStrictCommand("TOUCH", new LongReplayConvertor()); RedisStrictCommand TOUCH = new RedisStrictCommand("TOUCH", new BooleanReplayConvertor()); diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/PendingEntryDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/PendingEntryDecoder.java new file mode 100644 index 000000000..501d09893 --- /dev/null +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/PendingEntryDecoder.java @@ -0,0 +1,48 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol.decoder; + +import java.util.List; + +import org.redisson.api.PendingEntry; +import org.redisson.client.handler.State; +import org.redisson.client.protocol.Decoder; +import org.redisson.client.protocol.convertor.StreamIdConvertor; + +/** + * + * @author Nikita Koksharov + * + */ +public class PendingEntryDecoder implements MultiDecoder { + + private final StreamIdConvertor convertor = new StreamIdConvertor(); + + @Override + public Decoder getDecoder(int paramNum, State state) { + return null; + } + + @Override + public Object decode(List parts, State state) { + if (parts.isEmpty() || parts.get(0) instanceof PendingEntry) { + return parts; + } + return new PendingEntry(convertor.convert(parts.get(0)), parts.get(1).toString(), + Long.valueOf(parts.get(2).toString()), Long.valueOf(parts.get(3).toString())); + } + +} diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/PendingResultDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/PendingResultDecoder.java new file mode 100644 index 000000000..c5db984b5 --- /dev/null +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/PendingResultDecoder.java @@ -0,0 +1,51 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol.decoder; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.redisson.api.PendingResult; +import org.redisson.client.handler.State; +import org.redisson.client.protocol.Decoder; +import org.redisson.client.protocol.convertor.StreamIdConvertor; + +/** + * + * @author Nikita Koksharov + * + */ +public class PendingResultDecoder implements MultiDecoder { + + private final StreamIdConvertor convertor = new StreamIdConvertor(); + + @Override + public Decoder getDecoder(int paramNum, State state) { + return null; + } + + @Override + public Object decode(List parts, State state) { + Map consumerNames = new LinkedHashMap(); + List> customerParts = (List>) parts.get(3); + for (List mapping : customerParts) { + consumerNames.put(mapping.get(0), Long.valueOf(mapping.get(1))); + } + return new PendingResult((long)parts.get(0), convertor.convert(parts.get(1)), convertor.convert(parts.get(2)), consumerNames); + } + +} diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonStreamReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonStreamReactive.java new file mode 100644 index 000000000..63bf86dd4 --- /dev/null +++ b/redisson/src/main/java/org/redisson/reactive/RedissonStreamReactive.java @@ -0,0 +1,471 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.reactive; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.reactivestreams.Publisher; +import org.redisson.RedissonStream; +import org.redisson.api.PendingEntry; +import org.redisson.api.PendingResult; +import org.redisson.api.RFuture; +import org.redisson.api.RStream; +import org.redisson.api.RStreamAsync; +import org.redisson.api.RStreamReactive; +import org.redisson.api.StreamId; +import org.redisson.client.codec.Codec; +import org.redisson.command.CommandReactiveExecutor; + +import reactor.fn.Supplier; + +/** + * + * @author Nikita Koksharov + * + * @param type of key + * @param type of value + */ +public class RedissonStreamReactive extends RedissonExpirableReactive implements RStreamReactive { + + RStreamAsync instance; + + public RedissonStreamReactive(CommandReactiveExecutor commandExecutor, String name) { + super(commandExecutor, name, new RedissonStream(commandExecutor, name)); + this.instance = (RStream) super.instance; + } + + public RedissonStreamReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { + super(codec, commandExecutor, name, new RedissonStream(codec, commandExecutor, name)); + this.instance = (RStream) super.instance; + } + + @Override + public Publisher createGroup(final String groupName) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.createGroupAsync(groupName); + } + }); + } + + @Override + public Publisher createGroup(final String groupName, final StreamId id) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.createGroupAsync(groupName, id); + } + }); + } + + @Override + public Publisher ack(final String groupName, final StreamId... ids) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.ackAsync(groupName, ids); + } + }); + } + + @Override + public Publisher listPending(final String groupName) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.listPendingAsync(groupName); + } + }); + } + + @Override + public Publisher> listPending(final String groupName, final StreamId startId, final StreamId endId, final int count) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.listPendingAsync(groupName, startId, endId, count); + } + }); + } + + @Override + public Publisher> listPending(final String groupName, final StreamId startId, final StreamId endId, final int count, + final String consumerName) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.listPendingAsync(groupName, startId, endId, count, consumerName); + } + }); + } + + @Override + public Publisher>> claim(final String groupName, final String consumerName, final long idleTime, + final TimeUnit idleTimeUnit, final StreamId... ids) { + return reactive(new Supplier>>>() { + @Override + public RFuture>> get() { + return instance.claimAsync(groupName, consumerName, idleTime, idleTimeUnit, ids); + } + }); + } + + @Override + public Publisher>> readGroup(final String groupName, final String consumerName, final StreamId... ids) { + return reactive(new Supplier>>>() { + @Override + public RFuture>> get() { + return instance.readGroupAsync(groupName, consumerName, ids); + } + }); + } + + @Override + public Publisher>> readGroup(final String groupName, final String consumerName, final int count, + final StreamId... ids) { + return reactive(new Supplier>>>() { + @Override + public RFuture>> get() { + return instance.readGroupAsync(groupName, consumerName, count, ids); + } + }); + } + + @Override + public Publisher>> readGroup(final String groupName, final String consumerName, final long timeout, + final TimeUnit unit, final StreamId... ids) { + return reactive(new Supplier>>>() { + @Override + public RFuture>> get() { + return instance.readGroupAsync(groupName, consumerName, timeout, unit, ids); + } + }); + } + + @Override + public Publisher>> readGroup(final String groupName, final String consumerName, final int count, final long timeout, + final TimeUnit unit, final StreamId... ids) { + return reactive(new Supplier>>>() { + @Override + public RFuture>> get() { + return instance.readGroupAsync(groupName, consumerName, timeout, unit, ids); + } + }); + } + + @Override + public Publisher size() { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.sizeAsync(); + } + }); + } + + @Override + public Publisher add(final K key, final V value) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.addAsync(key, value); + } + }); + } + + @Override + public Publisher add(final StreamId id, final K key, final V value) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.addAsync(id, key, value); + } + }); + } + + @Override + public Publisher add(final K key, final V value, final int trimLen, final boolean trimStrict) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.addAsync(key, value, trimLen, trimStrict); + } + }); + } + + @Override + public Publisher add(final StreamId id, final K key, final V value, final int trimLen, final boolean trimStrict) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.addAsync(id, key, value, trimLen, trimStrict); + } + }); + } + + @Override + public Publisher addAll(final Map entries) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.addAllAsync(entries); + } + }); + } + + @Override + public Publisher addAll(final StreamId id, final Map entries) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.addAllAsync(id, entries); + } + }); + } + + @Override + public Publisher addAll(final Map entries, final int trimLen, final boolean trimStrict) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.addAllAsync(entries, trimLen, trimStrict); + } + }); + } + + @Override + public Publisher addAll(final StreamId id, final Map entries, final int trimLen, final boolean trimStrict) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.addAllAsync(id, entries, trimLen, trimStrict); + } + }); + } + + @Override + public Publisher>> read(final StreamId... ids) { + return reactive(new Supplier>>>() { + @Override + public RFuture>> get() { + return instance.readAsync(ids); + } + }); + } + + @Override + public Publisher>> read(final int count, final StreamId... ids) { + return reactive(new Supplier>>>() { + @Override + public RFuture>> get() { + return instance.readAsync(count, ids); + } + }); + } + + @Override + public Publisher>> read(final long timeout, final TimeUnit unit, final StreamId... ids) { + return reactive(new Supplier>>>() { + @Override + public RFuture>> get() { + return instance.readAsync(timeout, unit, ids); + } + }); + } + + @Override + public Publisher>> read(final int count, final long timeout, final TimeUnit unit, final StreamId... ids) { + return reactive(new Supplier>>>() { + @Override + public RFuture>> get() { + return instance.readAsync(count, timeout, unit, ids); + } + }); + } + + @Override + public Publisher>>> read(final StreamId id, final String name2, final StreamId id2) { + return reactive(new Supplier>>>>() { + @Override + public RFuture>>> get() { + return instance.readAsync(id, name2, id2); + } + }); + } + + @Override + public Publisher>>> read(final StreamId id, final String name2, final StreamId id2, final String name3, + final StreamId id3) { + return reactive(new Supplier>>>>() { + @Override + public RFuture>>> get() { + return instance.readAsync(id, name2, id2, name3, id3); + } + }); + } + + @Override + public Publisher>>> read(final StreamId id, final Map nameToId) { + return reactive(new Supplier>>>>() { + @Override + public RFuture>>> get() { + return instance.readAsync(id, nameToId); + } + }); + } + + @Override + public Publisher>>> read(final int count, final StreamId id, final String name2, final StreamId id2) { + return reactive(new Supplier>>>>() { + @Override + public RFuture>>> get() { + return instance.readAsync(count, id, name2, id2); + } + }); + } + + @Override + public Publisher>>> read(final int count, final StreamId id, final String name2, final StreamId id2, + final String name3, final StreamId id3) { + return reactive(new Supplier>>>>() { + @Override + public RFuture>>> get() { + return instance.readAsync(count, id, name2, id2, name3, id3); + } + }); + } + + @Override + public Publisher>>> read(final int count, final StreamId id, + final Map nameToId) { + return reactive(new Supplier>>>>() { + @Override + public RFuture>>> get() { + return instance.readAsync(count, id, nameToId); + } + }); + } + + @Override + public Publisher>>> read(final long timeout, final TimeUnit unit, final StreamId id, final String name2, + final StreamId id2) { + return reactive(new Supplier>>>>() { + @Override + public RFuture>>> get() { + return instance.readAsync(timeout, unit, id, name2, id2); + } + }); + } + + @Override + public Publisher>>> read(final long timeout, final TimeUnit unit, final StreamId id, final String name2, + final StreamId id2, final String name3, final StreamId id3) { + return reactive(new Supplier>>>>() { + @Override + public RFuture>>> get() { + return instance.readAsync(timeout, unit, id, name2, id2, name3, id3); + } + }); + } + + @Override + public Publisher>>> read(final long timeout, final TimeUnit unit, final StreamId id, + final Map nameToId) { + return reactive(new Supplier>>>>() { + @Override + public RFuture>>> get() { + return instance.readAsync(timeout, unit, id, nameToId); + } + }); + } + + @Override + public Publisher>>> read(final int count, final long timeout, final TimeUnit unit, final StreamId id, + final String name2, final StreamId id2) { + return reactive(new Supplier>>>>() { + @Override + public RFuture>>> get() { + return instance.readAsync(count, timeout, unit, id, name2, id2); + } + }); + } + + @Override + public Publisher>>> read(final int count, final long timeout, final TimeUnit unit, final StreamId id, + final String name2, final StreamId id2, final String name3, final StreamId id3) { + return reactive(new Supplier>>>>() { + @Override + public RFuture>>> get() { + return instance.readAsync(count, timeout, unit, id, name2, id2, name3, id3); + } + }); + } + + @Override + public Publisher>>> read(final int count, final long timeout, final TimeUnit unit, final StreamId id, + final Map nameToId) { + return reactive(new Supplier>>>>() { + @Override + public RFuture>>> get() { + return instance.readAsync(count, timeout, unit, id, nameToId); + } + }); + } + + @Override + public Publisher>> range(final StreamId startId, final StreamId endId) { + return reactive(new Supplier>>>() { + @Override + public RFuture>> get() { + return instance.rangeAsync(startId, endId); + } + }); + } + + @Override + public Publisher>> range(final int count, final StreamId startId, final StreamId endId) { + return reactive(new Supplier>>>() { + @Override + public RFuture>> get() { + return instance.rangeAsync(count, startId, endId); + } + }); + } + + @Override + public Publisher>> rangeReversed(final StreamId startId, final StreamId endId) { + return reactive(new Supplier>>>() { + @Override + public RFuture>> get() { + return instance.rangeReversedAsync(startId, endId); + } + }); + } + + @Override + public Publisher>> rangeReversed(final int count, final StreamId startId, final StreamId endId) { + return reactive(new Supplier>>>() { + @Override + public RFuture>> get() { + return instance.rangeReversedAsync(startId, endId); + } + }); + } + +} diff --git a/redisson/src/test/java/org/redisson/RedissonStreamTest.java b/redisson/src/test/java/org/redisson/RedissonStreamTest.java index 567842fa1..924538652 100644 --- a/redisson/src/test/java/org/redisson/RedissonStreamTest.java +++ b/redisson/src/test/java/org/redisson/RedissonStreamTest.java @@ -4,15 +4,138 @@ import static org.assertj.core.api.Assertions.assertThat; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import org.junit.Test; +import org.redisson.api.PendingEntry; +import org.redisson.api.PendingResult; import org.redisson.api.RStream; import org.redisson.api.StreamId; public class RedissonStreamTest extends BaseTest { + @Test + public void testClaim() { + RStream stream = redisson.getStream("test"); + + stream.add("0", "0"); + + stream.createGroup("testGroup"); + + StreamId id1 = stream.add("1", "1"); + StreamId id2 = stream.add("2", "2"); + + Map> s = stream.readGroup("testGroup", "consumer1"); + assertThat(s.size()).isEqualTo(2); + + StreamId id3 = stream.add("3", "33"); + StreamId id4 = stream.add("4", "44"); + + Map> s2 = stream.readGroup("testGroup", "consumer2"); + assertThat(s2.size()).isEqualTo(2); + + Map> res = stream.claimPending("testGroup", "consumer1", 1, TimeUnit.MILLISECONDS, id3, id4); + assertThat(res.size()).isEqualTo(2); + assertThat(res.keySet()).containsExactly(id3, id4); + for (Map map : res.values()) { + assertThat(map.keySet()).containsAnyOf("3", "4"); + assertThat(map.values()).containsAnyOf("33", "44"); + } + } + + @Test + public void testPending() { + RStream stream = redisson.getStream("test"); + + stream.add("0", "0"); + + stream.createGroup("testGroup"); + + StreamId id1 = stream.add("1", "1"); + StreamId id2 = stream.add("2", "2"); + + Map> s = stream.readGroup("testGroup", "consumer1"); + assertThat(s.size()).isEqualTo(2); + + StreamId id3 = stream.add("3", "3"); + StreamId id4 = stream.add("4", "4"); + + Map> s2 = stream.readGroup("testGroup", "consumer2"); + assertThat(s2.size()).isEqualTo(2); + + PendingResult pi = stream.listPending("testGroup"); + assertThat(pi.getLowestId()).isEqualTo(id1); + assertThat(pi.getHighestId()).isEqualTo(id4); + assertThat(pi.getTotal()).isEqualTo(4); + assertThat(pi.getConsumerNames().keySet()).containsExactly("consumer1", "consumer2"); + + List list = stream.listPending("testGroup", StreamId.MIN, StreamId.MAX, 10); + assertThat(list.size()).isEqualTo(4); + for (PendingEntry pendingEntry : list) { + assertThat(pendingEntry.getId()).isIn(id1, id2, id3, id4); + assertThat(pendingEntry.getConsumerName()).isIn("consumer1", "consumer2"); + assertThat(pendingEntry.getLastTimeDelivered()).isOne(); + } + + List list2 = stream.listPending("testGroup", StreamId.MIN, StreamId.MAX, 10, "consumer1"); + assertThat(list2.size()).isEqualTo(2); + for (PendingEntry pendingEntry : list2) { + assertThat(pendingEntry.getId()).isIn(id1, id2); + assertThat(pendingEntry.getConsumerName()).isEqualTo("consumer1"); + assertThat(pendingEntry.getLastTimeDelivered()).isOne(); + } + } + + @Test + public void testAck() { + RStream stream = redisson.getStream("test"); + + stream.add("0", "0"); + + stream.createGroup("testGroup"); + + StreamId id1 = stream.add("1", "1"); + StreamId id2 = stream.add("2", "2"); + + Map> s = stream.readGroup("testGroup", "consumer1"); + assertThat(s.size()).isEqualTo(2); + + assertThat(stream.ack("testGroup", id1, id2)).isEqualTo(2); + } + + @Test + public void testReadGroup() { + RStream stream = redisson.getStream("test"); + + StreamId id0 = stream.add("0", "0"); + + stream.createGroup("testGroup", id0); + + stream.add("1", "1"); + stream.add("2", "2"); + stream.add("3", "3"); + + Map> s = stream.readGroup("testGroup", "consumer1"); + assertThat(s.values().iterator().next().keySet()).containsAnyOf("1", "2", "3"); + assertThat(s.size()).isEqualTo(3); + + stream.add("1", "1"); + stream.add("2", "2"); + stream.add("3", "3"); + + Map> s1 = stream.readGroup("testGroup", "consumer1", 1); + assertThat(s1.size()).isEqualTo(1); + + StreamId id = stream.add("1", "1"); + stream.add("2", "2"); + stream.add("3", "3"); + + Map> s2 = stream.readGroup("testGroup", "consumer1", id); + assertThat(s2.size()).isEqualTo(2); + } + @Test public void testRangeReversed() { RStream stream = redisson.getStream("test");