Fixed - XPENDING command causes syntax error in redisson-spring-data-23. #2959

pull/2979/head
Nikita Koksharov 5 years ago
parent c1dd88be15
commit c67b0a7fa7

@ -290,6 +290,29 @@ public class RedissonStreamCommands implements RedisStreamCommands {
new ObjectListReplayDecoder(), new ObjectListReplayDecoder())), key, groupName);
}
private static class PendingMessageReplayDecoder implements MultiDecoder<PendingMessage> {
private String groupName;
public PendingMessageReplayDecoder(String groupName) {
this.groupName = groupName;
}
@Override
public Decoder<Object> getDecoder(int paramNum, State state) {
return null;
}
@Override
public PendingMessage decode(List<Object> parts, State state) {
PendingMessage pm = new PendingMessage(RecordId.of(parts.get(0).toString()),
Consumer.from(groupName, parts.get(1).toString()),
Duration.of(Long.valueOf(parts.get(2).toString()), ChronoUnit.MILLIS),
Long.valueOf(parts.get(3).toString()));
return pm;
}
}
private static class PendingMessagesReplayDecoder implements MultiDecoder<PendingMessages> {
private final String groupName;
@ -307,16 +330,7 @@ public class RedissonStreamCommands implements RedisStreamCommands {
@Override
public PendingMessages decode(List<Object> parts, State state) {
List<List<Object>> list = (List<List<Object>>) (Object) parts;
List<PendingMessage> pendingMessages = new ArrayList<>();
for (List<Object> entry : list) {
PendingMessage pm = new PendingMessage(RecordId.of(entry.get(0).toString()),
Consumer.from(groupName, entry.get(1).toString()),
Duration.of(Long.valueOf(entry.get(2).toString()), ChronoUnit.MILLIS),
Long.valueOf(entry.get(3).toString()));
pendingMessages.add(pm);
}
List<PendingMessage> pendingMessages = (List<PendingMessage>) (Object) parts;
return new PendingMessages(groupName, range, pendingMessages);
}
}
@ -327,6 +341,7 @@ public class RedissonStreamCommands implements RedisStreamCommands {
Assert.notNull(groupName, "Group name must not be null!");
List<Object> params = new ArrayList<>();
params.add(key);
params.add(groupName);
params.add(((Range.Bound<String>)options.getRange().getLowerBound()).getValue().orElse("-"));
@ -334,13 +349,18 @@ public class RedissonStreamCommands implements RedisStreamCommands {
if (options.getCount() != null) {
params.add(options.getCount());
} else {
params.add(10);
}
if (options.getConsumerName() != null) {
params.add(options.getConsumerName());
}
return connection.write(key, StringCodec.INSTANCE, new RedisCommand<>("XPENDING",
new PendingMessagesReplayDecoder(groupName, options.getRange())), params.toArray());
new ListMultiDecoder2(
new PendingMessagesReplayDecoder(groupName, options.getRange()),
new PendingMessageReplayDecoder(groupName))),
params.toArray());
}
@Override

@ -1,14 +1,10 @@
package org.redisson.spring.data.connection;
import org.junit.Test;
import org.redisson.api.RStream;
import org.redisson.api.StreamConsumer;
import org.redisson.api.StreamMessageId;
import org.springframework.data.redis.connection.stream.*;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
@ -17,6 +13,24 @@ import static org.assertj.core.api.Assertions.assertThat;
*/
public class RedissonStreamTest extends BaseConnectionTest {
@Test
public void testPending() {
connection.streamCommands().xGroupCreate("test".getBytes(), "testGroup", ReadOffset.latest(), true);
PendingMessages p = connection.streamCommands().xPending("test".getBytes(), Consumer.from("testGroup", "test1"));
assertThat(p.size()).isEqualTo(0);
connection.streamCommands().xAdd("test".getBytes(), Collections.singletonMap("1".getBytes(), "1".getBytes()));
connection.streamCommands().xAdd("test".getBytes(), Collections.singletonMap("2".getBytes(), "2".getBytes()));
connection.streamCommands().xAdd("test".getBytes(), Collections.singletonMap("3".getBytes(), "3".getBytes()));
List<ByteRecord> l = connection.streamCommands().xReadGroup(Consumer.from("testGroup", "test1"), StreamOffset.create("test".getBytes(), ReadOffset.from(">")));
assertThat(l.size()).isEqualTo(3);
PendingMessages p2 = connection.streamCommands().xPending("test".getBytes(), Consumer.from("testGroup", "test1"));
assertThat(p2.size()).isEqualTo(3);
}
@Test
public void testGroups() {
connection.streamCommands().xGroupCreate("test".getBytes(), "testGroup", ReadOffset.latest(), true);

Loading…
Cancel
Save