diff --git a/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonStreamCommands.java b/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonStreamCommands.java index 00c026001..6c7757bac 100644 --- a/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonStreamCommands.java +++ b/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonStreamCommands.java @@ -290,6 +290,29 @@ public class RedissonStreamCommands implements RedisStreamCommands { new ObjectListReplayDecoder(), new ObjectListReplayDecoder())), key, groupName); } + private static class PendingMessageReplayDecoder implements MultiDecoder<PendingMessage> { + + private String groupName; + + public PendingMessageReplayDecoder(String groupName) { + this.groupName = groupName; + } + + @Override + public Decoder<Object> getDecoder(int paramNum, State state) { + return null; + } + + @Override + public PendingMessage decode(List<Object> parts, State state) { + PendingMessage pm = new PendingMessage(RecordId.of(parts.get(0).toString()), + Consumer.from(groupName, parts.get(1).toString()), + Duration.of(Long.valueOf(parts.get(2).toString()), ChronoUnit.MILLIS), + Long.valueOf(parts.get(3).toString())); + return pm; + } + } + private static class PendingMessagesReplayDecoder implements MultiDecoder<PendingMessages> { private final String groupName; @@ -307,16 +330,7 @@ public class RedissonStreamCommands implements RedisStreamCommands { @Override public PendingMessages decode(List<Object> parts, State state) { - List<List<Object>> list = (List<List<Object>>) (Object) parts; - List<PendingMessage> pendingMessages = new ArrayList<>(); - for (List<Object> entry : list) { - PendingMessage pm = new PendingMessage(RecordId.of(entry.get(0).toString()), - Consumer.from(groupName, entry.get(1).toString()), - Duration.of(Long.valueOf(entry.get(2).toString()), ChronoUnit.MILLIS), - Long.valueOf(entry.get(3).toString())); - - pendingMessages.add(pm); - } + List<PendingMessage> pendingMessages = (List<PendingMessage>) (Object) parts; return new PendingMessages(groupName, range, pendingMessages); } } @@ -327,6 +341,7 @@ public class RedissonStreamCommands implements RedisStreamCommands { Assert.notNull(groupName, "Group name must not be null!"); List<Object> params = new ArrayList<>(); + params.add(key); params.add(groupName); params.add(((Range.Bound<String>)options.getRange().getLowerBound()).getValue().orElse("-")); @@ -334,13 +349,18 @@ public class RedissonStreamCommands implements RedisStreamCommands { if (options.getCount() != null) { params.add(options.getCount()); + } else { + params.add(10); } if (options.getConsumerName() != null) { params.add(options.getConsumerName()); } return connection.write(key, StringCodec.INSTANCE, new RedisCommand<>("XPENDING", - new PendingMessagesReplayDecoder(groupName, options.getRange())), params.toArray()); + new ListMultiDecoder2( + new PendingMessagesReplayDecoder(groupName, options.getRange()), + new PendingMessageReplayDecoder(groupName))), + params.toArray()); } @Override diff --git a/redisson-spring-data/redisson-spring-data-23/src/test/java/org/redisson/spring/data/connection/RedissonStreamTest.java b/redisson-spring-data/redisson-spring-data-23/src/test/java/org/redisson/spring/data/connection/RedissonStreamTest.java index a84aee97f..dce81f229 100644 --- a/redisson-spring-data/redisson-spring-data-23/src/test/java/org/redisson/spring/data/connection/RedissonStreamTest.java +++ b/redisson-spring-data/redisson-spring-data-23/src/test/java/org/redisson/spring/data/connection/RedissonStreamTest.java @@ -1,14 +1,10 @@ package org.redisson.spring.data.connection; import org.junit.Test; -import org.redisson.api.RStream; -import org.redisson.api.StreamConsumer; -import org.redisson.api.StreamMessageId; import org.springframework.data.redis.connection.stream.*; import java.util.Collections; import java.util.List; -import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; @@ -17,6 +13,24 @@ import static org.assertj.core.api.Assertions.assertThat; */ public class RedissonStreamTest extends BaseConnectionTest { + @Test + public void testPending() { + connection.streamCommands().xGroupCreate("test".getBytes(), "testGroup", ReadOffset.latest(), true); + + PendingMessages p = connection.streamCommands().xPending("test".getBytes(), Consumer.from("testGroup", "test1")); + assertThat(p.size()).isEqualTo(0); + + connection.streamCommands().xAdd("test".getBytes(), Collections.singletonMap("1".getBytes(), "1".getBytes())); + connection.streamCommands().xAdd("test".getBytes(), Collections.singletonMap("2".getBytes(), "2".getBytes())); + connection.streamCommands().xAdd("test".getBytes(), Collections.singletonMap("3".getBytes(), "3".getBytes())); + + List<ByteRecord> l = connection.streamCommands().xReadGroup(Consumer.from("testGroup", "test1"), StreamOffset.create("test".getBytes(), ReadOffset.from(">"))); + assertThat(l.size()).isEqualTo(3); + + PendingMessages p2 = connection.streamCommands().xPending("test".getBytes(), Consumer.from("testGroup", "test1")); + assertThat(p2.size()).isEqualTo(3); + } + @Test public void testGroups() { connection.streamCommands().xGroupCreate("test".getBytes(), "testGroup", ReadOffset.latest(), true);