Feature - Add entriesRead and makeStream parameters to RStream.createGroup() method #5083 #5081

pull/5099/head
Nikita Koksharov 2 years ago
parent 77c315ddb6
commit ade3b1e405

@ -56,6 +56,29 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
}
@Override
public void createGroup(StreamCreateGroupArgs args) {
get(createGroupAsync(args));
}
@Override
public RFuture<Void> createGroupAsync(StreamCreateGroupArgs args) {
StreamCreateGroupParams pps = (StreamCreateGroupParams) args;
List<Object> params = new LinkedList<>();
params.add("CREATE");
params.add(getRawName());
params.add(pps.getName());
params.add(pps.getId());
if (pps.isMakeStream()) {
params.add("MKSTREAM");
}
if (pps.getEntriesRead() > 0) {
params.add("ENTRIESREAD");
params.add(pps.getEntriesRead());
}
return commandExecutor.writeAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.XGROUP, params.toArray());
}
@Override
public void createGroup(String groupName) {
get(createGroupAsync(groupName));

@ -33,6 +33,18 @@ import java.util.concurrent.TimeUnit;
*/
public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
/**
* Creates consumer group.
* <p>
* Usage examples:
* <pre>
* StreamMessageId id = stream.createGroup(StreamCreateGroupArgs.name("test").id(id).makeStream());
* </pre>
*
* @param args method arguments object
*/
void createGroup(StreamCreateGroupArgs args);
/**
* Creates consumer group by name.
* Only new messages will be available for consumers of this group.

@ -33,6 +33,18 @@ import java.util.concurrent.TimeUnit;
*/
public interface RStreamAsync<K, V> extends RExpirableAsync {
/**
* Creates consumer group.
* <p>
* Usage examples:
* <pre>
* StreamMessageId id = stream.createGroup(StreamCreateGroupArgs.name("test").id(id).makeStream());
* </pre>
*
* @param args method arguments object
*/
RFuture<Void> createGroupAsync(StreamCreateGroupArgs args);
/**
* Creates consumer group by name.
*

@ -34,6 +34,18 @@ import reactor.core.publisher.Mono;
*/
public interface RStreamReactive<K, V> extends RExpirableReactive {
/**
* Creates consumer group.
* <p>
* Usage examples:
* <pre>
* StreamMessageId id = stream.createGroup(StreamCreateGroupArgs.name("test").id(id).makeStream());
* </pre>
*
* @param args method arguments object
*/
Mono<Void> createGroup(StreamCreateGroupArgs args);
/**
* Creates consumer group by name.
*

@ -35,6 +35,18 @@ import org.redisson.api.stream.*;
*/
public interface RStreamRx<K, V> extends RExpirableRx {
/**
* Creates consumer group.
* <p>
* Usage examples:
* <pre>
* StreamMessageId id = stream.createGroup(StreamCreateGroupArgs.name("test").id(id).makeStream());
* </pre>
*
* @param args method arguments object
*/
Completable createGroup(StreamCreateGroupArgs args);
/**
* Creates consumer group by name.
*

@ -0,0 +1,64 @@
/**
* Copyright (c) 2013-2022 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.stream;
import org.redisson.api.StreamMessageId;
/**
* Arguments object for {@link org.redisson.api.RStream#createGroup(StreamCreateGroupArgs)} method.
*
* @author Nikita Koksharov
*
*/
public interface StreamCreateGroupArgs {
/**
* Defines entries_read argument
*
* @param amount entries_read argument
* @return arguments object
*/
StreamCreateGroupArgs entriesRead(int amount);
/**
* Defines whether a stream should be created if it doesn't exist.
*
* @return arguments object
*/
StreamCreateGroupArgs makeStream();
/**
* Defines Stream Message ID.
* Only new messages after defined stream <code>id</code> will
* be available for consumers of this group.
*
* @param id Stream Message ID
* @return arguments object
*/
StreamCreateGroupArgs id(StreamMessageId id);
/**
* Defines name of group.
* Only new messages will be available for consumers of this group.
*
* @param value name of group
* @return arguments object
*/
static StreamCreateGroupArgs name(String value) {
return new StreamCreateGroupParams(value);
}
}

@ -0,0 +1,69 @@
/**
* Copyright (c) 2013-2022 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.stream;
import org.redisson.api.StreamMessageId;
/**
*
* @author Nikita Koksharov
*
*/
public final class StreamCreateGroupParams implements StreamCreateGroupArgs {
private final String name;
private boolean makeStream;
private int entriesRead;
private StreamMessageId id = StreamMessageId.NEWEST;
public StreamCreateGroupParams(String value) {
this.name = value;
}
@Override
public StreamCreateGroupArgs entriesRead(int amount) {
this.entriesRead = amount;
return this;
}
@Override
public StreamCreateGroupArgs makeStream() {
this.makeStream = true;
return this;
}
@Override
public StreamCreateGroupArgs id(StreamMessageId id) {
this.id = id;
return this;
}
public String getName() {
return name;
}
public boolean isMakeStream() {
return makeStream;
}
public int getEntriesRead() {
return entriesRead;
}
public StreamMessageId getId() {
return id;
}
}
Loading…
Cancel
Save