diff --git a/redisson/src/main/java/org/redisson/RedissonStream.java b/redisson/src/main/java/org/redisson/RedissonStream.java index 438386b8c..c3cb74ac1 100644 --- a/redisson/src/main/java/org/redisson/RedissonStream.java +++ b/redisson/src/main/java/org/redisson/RedissonStream.java @@ -79,7 +79,7 @@ public class RedissonStream extends RedissonExpirable implements RStream createGroupAsync(String groupName, StreamMessageId id) { - return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XGROUP, "CREATE", getName(), groupName, id); + return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XGROUP, "CREATE", getName(), groupName, id, "MKSTREAM"); } @Override diff --git a/redisson/src/main/java/org/redisson/api/RStream.java b/redisson/src/main/java/org/redisson/api/RStream.java index a3c5ecf6c..7e02e97b9 100644 --- a/redisson/src/main/java/org/redisson/api/RStream.java +++ b/redisson/src/main/java/org/redisson/api/RStream.java @@ -33,6 +33,7 @@ public interface RStream extends RStreamAsync, RExpirable { /** * Creates consumer group by name. + * Only new messages will be available for consumers of this group. * * @param groupName - name of group */ @@ -42,7 +43,8 @@ public interface RStream extends RStreamAsync, RExpirable { * Creates consumer group by name and Stream Message ID. * Only new messages after defined stream id will be available for consumers of this group. *

- * {@link StreamMessageId#NEWEST} is used for messages arrived since the moment of group creating + * {@link StreamMessageId#NEWEST} is used for messages arrived since the moment of group creation + * {@link StreamMessageId#ALL} is used for all messages added before and after the moment of group creation * * @param groupName - name of group * @param id - Stream Message ID diff --git a/redisson/src/main/java/org/redisson/api/StreamMessageId.java b/redisson/src/main/java/org/redisson/api/StreamMessageId.java index 903741055..86c605cf1 100644 --- a/redisson/src/main/java/org/redisson/api/StreamMessageId.java +++ b/redisson/src/main/java/org/redisson/api/StreamMessageId.java @@ -26,26 +26,33 @@ public class StreamMessageId { /** * Defines id to receive Stream entries never delivered to any other consumer. *

- * Used in {@link RStream#readGroup} and {@link RStreamAsync#readGroupAsync} methods + * Used in {@link RStream#readGroup} methods */ public static final StreamMessageId NEVER_DELIVERED = new StreamMessageId(-1); /** - * Defines minimal id. Used in {@link RStream#range} and {@link RStreamAsync#rangeAsync} methods + * Defines minimal id. Used in {@link RStream#range} methods */ public static final StreamMessageId MIN = new StreamMessageId(-1); /** - * Defines maximal id. Used in {@link RStream#range} and {@link RStreamAsync#rangeAsync} methods + * Defines maximal id. Used in {@link RStream#range} methods */ public static final StreamMessageId MAX = new StreamMessageId(-1); /** * Defines id to receive Stream entries added since method invocation. *

- * Used in {@link RStream#read} and {@link RStreamAsync#readAsync} methods + * Used in {@link RStream#read}, {@link RStream#createGroup} methods */ public static final StreamMessageId NEWEST = new StreamMessageId(-1); + + /** + * Defines id to receive all Stream entries. + *

+ * Used in {@link RStream#read}, {@link RStream#createGroup} methods + */ + public static final StreamMessageId ALL = new StreamMessageId(-1); private long id0; private long id1; @@ -118,6 +125,9 @@ public class StreamMessageId { if (this == MAX) { return "+"; } + if (this == ALL) { + return "0"; + } return id0 + "-" + id1; } diff --git a/redisson/src/test/java/org/redisson/RedissonStreamTest.java b/redisson/src/test/java/org/redisson/RedissonStreamTest.java index b740b6462..4821ad1c7 100644 --- a/redisson/src/test/java/org/redisson/RedissonStreamTest.java +++ b/redisson/src/test/java/org/redisson/RedissonStreamTest.java @@ -259,6 +259,15 @@ public class RedissonStreamTest extends BaseTest { // assertThat(s2.size()).isEqualTo(3); } + @Test + public void testCreateEmpty() { + RStream stream = redisson.getStream("test"); + stream.createGroup("testGroup", StreamMessageId.ALL); + stream.add("1", "2"); + + Map> s = stream.readGroup("testGroup", "consumer1"); + assertThat(s).hasSize(1); + } @Test public void testReadGroup() {