feat: Added autogenerated stream sequence id support to StreamMessageId

Signed-off-by: Manuel Polo <mrmx@duck.com>
pull/5600/head
Manuel Polo 1 year ago
parent db4fe1546a
commit 72c76be6b1
No known key found for this signature in database
GPG Key ID: 225EFA9B34C561E3

@ -65,9 +65,11 @@ public class StreamMessageId {
*/
public static final StreamMessageId ALL = new StreamMessageId(-1);
private long id0;
private final long id0;
private long id1;
private boolean autogenerateSequenceId;
public StreamMessageId(long id0) {
super();
this.id0 = id0;
@ -79,6 +81,11 @@ public class StreamMessageId {
this.id1 = id1;
}
public StreamMessageId autogenerateSequenceId() {
this.autogenerateSequenceId = true;
return this;
}
/**
* Returns first part of ID
*
@ -119,7 +126,7 @@ public class StreamMessageId {
return false;
if (id1 != other.id1)
return false;
return true;
return autogenerateSequenceId == other.autogenerateSequenceId;
}
@Override
@ -143,7 +150,7 @@ public class StreamMessageId {
return "*";
}
return id0 + "-" + id1;
return id0 + "-" + (autogenerateSequenceId ? "*" : id1);
}
}

@ -520,6 +520,31 @@ public class RedissonStreamTest extends RedisDockerTest {
assertThat(s2).isEmpty();
}
@Test
public void testAutogenerateStreamSequenceId() {
RStream<String, String> stream = redisson.getStream("test");
assertThat(stream.size()).isEqualTo(0);
StreamMessageId id = new StreamMessageId(1).autogenerateSequenceId();
Map<String, String> entry1 = new HashMap<>();
entry1.put("test", "value1");
Map<String, String> entry2 = new HashMap<>();
entry2.put("test", "value2");
stream.add(id,StreamAddArgs.entries(entry1));
stream.add(id,StreamAddArgs.entries(entry2));
Map<StreamMessageId, Map<String, String>> r = stream.range(10, StreamMessageId.MIN, StreamMessageId.MAX);
System.out.println("r:" + r);
assertThat(r).size().isEqualTo(2);
assertThat(r.keySet()).containsExactly(
new StreamMessageId(1,0),new StreamMessageId(1,1)
);
assertThat(r.get(new StreamMessageId(1,0))).isEqualTo(entry1);
assertThat(r.get(new StreamMessageId(1,1))).isEqualTo(entry2);
}
@Test
public void testRangeReversed() {
RStream<String, String> stream = redisson.getStream("test");

Loading…
Cancel
Save