diff --git a/redisson/src/main/java/org/redisson/RedissonStream.java b/redisson/src/main/java/org/redisson/RedissonStream.java index af4879c20..fad793c1e 100644 --- a/redisson/src/main/java/org/redisson/RedissonStream.java +++ b/redisson/src/main/java/org/redisson/RedissonStream.java @@ -55,7 +55,30 @@ public class RedissonStream extends RedissonExpirable implements RStream createGroupAsync(StreamCreateGroupArgs args) { + StreamCreateGroupParams pps = (StreamCreateGroupParams) args; + List params = new LinkedList<>(); + params.add("CREATE"); + params.add(getRawName()); + params.add(pps.getName()); + params.add(pps.getId()); + if (pps.isMakeStream()) { + params.add("MKSTREAM"); + } + if (pps.getEntriesRead() > 0) { + params.add("ENTRIESREAD"); + params.add(pps.getEntriesRead()); + } + return commandExecutor.writeAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.XGROUP, params.toArray()); + } + @Override public void createGroup(String groupName) { get(createGroupAsync(groupName)); diff --git a/redisson/src/main/java/org/redisson/api/RStream.java b/redisson/src/main/java/org/redisson/api/RStream.java index 078e4377a..c77cf3bb3 100644 --- a/redisson/src/main/java/org/redisson/api/RStream.java +++ b/redisson/src/main/java/org/redisson/api/RStream.java @@ -33,6 +33,18 @@ import java.util.concurrent.TimeUnit; */ public interface RStream extends RStreamAsync, RExpirable { + /** + * Creates consumer group. + *

+ * Usage examples: + *

+     * StreamMessageId id = stream.createGroup(StreamCreateGroupArgs.name("test").id(id).makeStream());
+     * 
+ * + * @param args method arguments object + */ + void createGroup(StreamCreateGroupArgs args); + /** * Creates consumer group by name. * Only new messages will be available for consumers of this group. diff --git a/redisson/src/main/java/org/redisson/api/RStreamAsync.java b/redisson/src/main/java/org/redisson/api/RStreamAsync.java index 87bb323bd..4f49c8664 100644 --- a/redisson/src/main/java/org/redisson/api/RStreamAsync.java +++ b/redisson/src/main/java/org/redisson/api/RStreamAsync.java @@ -33,6 +33,18 @@ import java.util.concurrent.TimeUnit; */ public interface RStreamAsync extends RExpirableAsync { + /** + * Creates consumer group. + *

+ * Usage examples: + *

+     * StreamMessageId id = stream.createGroup(StreamCreateGroupArgs.name("test").id(id).makeStream());
+     * 
+ * + * @param args method arguments object + */ + RFuture createGroupAsync(StreamCreateGroupArgs args); + /** * Creates consumer group by name. * diff --git a/redisson/src/main/java/org/redisson/api/RStreamReactive.java b/redisson/src/main/java/org/redisson/api/RStreamReactive.java index 634e32ec1..cbef6fafc 100644 --- a/redisson/src/main/java/org/redisson/api/RStreamReactive.java +++ b/redisson/src/main/java/org/redisson/api/RStreamReactive.java @@ -34,6 +34,18 @@ import reactor.core.publisher.Mono; */ public interface RStreamReactive extends RExpirableReactive { + /** + * Creates consumer group. + *

+ * Usage examples: + *

+     * StreamMessageId id = stream.createGroup(StreamCreateGroupArgs.name("test").id(id).makeStream());
+     * 
+ * + * @param args method arguments object + */ + Mono createGroup(StreamCreateGroupArgs args); + /** * Creates consumer group by name. * diff --git a/redisson/src/main/java/org/redisson/api/RStreamRx.java b/redisson/src/main/java/org/redisson/api/RStreamRx.java index a22bc182a..b34bfdd7a 100644 --- a/redisson/src/main/java/org/redisson/api/RStreamRx.java +++ b/redisson/src/main/java/org/redisson/api/RStreamRx.java @@ -35,6 +35,18 @@ import org.redisson.api.stream.*; */ public interface RStreamRx extends RExpirableRx { + /** + * Creates consumer group. + *

+ * Usage examples: + *

+     * StreamMessageId id = stream.createGroup(StreamCreateGroupArgs.name("test").id(id).makeStream());
+     * 
+ * + * @param args method arguments object + */ + Completable createGroup(StreamCreateGroupArgs args); + /** * Creates consumer group by name. * diff --git a/redisson/src/main/java/org/redisson/api/stream/StreamCreateGroupArgs.java b/redisson/src/main/java/org/redisson/api/stream/StreamCreateGroupArgs.java new file mode 100644 index 000000000..e19bbacfb --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/stream/StreamCreateGroupArgs.java @@ -0,0 +1,64 @@ +/** + * Copyright (c) 2013-2022 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api.stream; + +import org.redisson.api.StreamMessageId; + +/** + * Arguments object for {@link org.redisson.api.RStream#createGroup(StreamCreateGroupArgs)} method. + * + * @author Nikita Koksharov + * + */ +public interface StreamCreateGroupArgs { + + /** + * Defines entries_read argument + * + * @param amount entries_read argument + * @return arguments object + */ + StreamCreateGroupArgs entriesRead(int amount); + + /** + * Defines whether a stream should be created if it doesn't exist. + * + * @return arguments object + */ + StreamCreateGroupArgs makeStream(); + + /** + * Defines Stream Message ID. + * Only new messages after defined stream id will + * be available for consumers of this group. + * + * @param id Stream Message ID + * @return arguments object + */ + StreamCreateGroupArgs id(StreamMessageId id); + + /** + * Defines name of group. + * Only new messages will be available for consumers of this group. + * + * @param value name of group + * @return arguments object + */ + static StreamCreateGroupArgs name(String value) { + return new StreamCreateGroupParams(value); + } + +} diff --git a/redisson/src/main/java/org/redisson/api/stream/StreamCreateGroupParams.java b/redisson/src/main/java/org/redisson/api/stream/StreamCreateGroupParams.java new file mode 100644 index 000000000..dcf1c7769 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/stream/StreamCreateGroupParams.java @@ -0,0 +1,69 @@ +/** + * Copyright (c) 2013-2022 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api.stream; + +import org.redisson.api.StreamMessageId; + +/** + * + * @author Nikita Koksharov + * + */ +public final class StreamCreateGroupParams implements StreamCreateGroupArgs { + + private final String name; + private boolean makeStream; + private int entriesRead; + private StreamMessageId id = StreamMessageId.NEWEST; + + public StreamCreateGroupParams(String value) { + this.name = value; + } + + @Override + public StreamCreateGroupArgs entriesRead(int amount) { + this.entriesRead = amount; + return this; + } + + @Override + public StreamCreateGroupArgs makeStream() { + this.makeStream = true; + return this; + } + + @Override + public StreamCreateGroupArgs id(StreamMessageId id) { + this.id = id; + return this; + } + + public String getName() { + return name; + } + + public boolean isMakeStream() { + return makeStream; + } + + public int getEntriesRead() { + return entriesRead; + } + + public StreamMessageId getId() { + return id; + } +}