Feature - allow to create Redis stream group if it doesn't exist #1915

pull/1907/head
Nikita Koksharov 6 years ago
parent b77fda1e7e
commit c16a31b09b

@ -79,7 +79,7 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
@Override
public RFuture<Void> createGroupAsync(String groupName, StreamMessageId id) {
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XGROUP, "CREATE", getName(), groupName, id);
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XGROUP, "CREATE", getName(), groupName, id, "MKSTREAM");
}
@Override

@ -33,6 +33,7 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
/**
* Creates consumer group by name.
* Only new messages will be available for consumers of this group.
*
* @param groupName - name of group
*/
@ -42,7 +43,8 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* Creates consumer group by name and Stream Message ID.
* Only new messages after defined stream <code>id</code> will be available for consumers of this group.
* <p>
* {@link StreamMessageId#NEWEST} is used for messages arrived since the moment of group creating
* {@link StreamMessageId#NEWEST} is used for messages arrived since the moment of group creation
* {@link StreamMessageId#ALL} is used for all messages added before and after the moment of group creation
*
* @param groupName - name of group
* @param id - Stream Message ID

@ -26,26 +26,33 @@ public class StreamMessageId {
/**
* Defines id to receive Stream entries never delivered to any other consumer.
* <p>
* Used in {@link RStream#readGroup} and {@link RStreamAsync#readGroupAsync} methods
* Used in {@link RStream#readGroup} methods
*/
public static final StreamMessageId NEVER_DELIVERED = new StreamMessageId(-1);
/**
* Defines minimal id. Used in {@link RStream#range} and {@link RStreamAsync#rangeAsync} methods
* Defines minimal id. Used in {@link RStream#range} methods
*/
public static final StreamMessageId MIN = new StreamMessageId(-1);
/**
* Defines maximal id. Used in {@link RStream#range} and {@link RStreamAsync#rangeAsync} methods
* Defines maximal id. Used in {@link RStream#range} methods
*/
public static final StreamMessageId MAX = new StreamMessageId(-1);
/**
* Defines id to receive Stream entries added since method invocation.
* <p>
* Used in {@link RStream#read} and {@link RStreamAsync#readAsync} methods
* Used in {@link RStream#read}, {@link RStream#createGroup} methods
*/
public static final StreamMessageId NEWEST = new StreamMessageId(-1);
/**
* Defines id to receive all Stream entries.
* <p>
* Used in {@link RStream#read}, {@link RStream#createGroup} methods
*/
public static final StreamMessageId ALL = new StreamMessageId(-1);
private long id0;
private long id1;
@ -118,6 +125,9 @@ public class StreamMessageId {
if (this == MAX) {
return "+";
}
if (this == ALL) {
return "0";
}
return id0 + "-" + id1;
}

@ -259,6 +259,15 @@ public class RedissonStreamTest extends BaseTest {
// assertThat(s2.size()).isEqualTo(3);
}
@Test
public void testCreateEmpty() {
RStream<String, String> stream = redisson.getStream("test");
stream.createGroup("testGroup", StreamMessageId.ALL);
stream.add("1", "2");
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
assertThat(s).hasSize(1);
}
@Test
public void testReadGroup() {

Loading…
Cancel
Save