Feature - removeGroup, removeConsumer, updateGroupMessageId methods added to RStream object. #1490

pull/1792/head
Nikita Koksharov 6 years ago
parent 2a86885f20
commit 4632aee75a

@ -110,8 +110,7 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
@Override
public RFuture<List<PendingEntry>> listPendingAsync(String groupName, StreamMessageId startId, StreamMessageId endId, int count,
String consumerName) {
public RFuture<List<PendingEntry>> listPendingAsync(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, int count) {
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.XPENDING_ENTRIES, getName(), groupName, startId, endId, count, consumerName);
}
@ -126,9 +125,8 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
@Override
public List<PendingEntry> listPending(String groupName, StreamMessageId startId, StreamMessageId endId, int count,
String consumerName) {
return get(listPendingAsync(groupName, startId, endId, count, consumerName));
public List<PendingEntry> listPending(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, int count) {
return get(listPendingAsync(groupName, consumerName, startId, endId, count));
}
@Override
@ -728,4 +726,34 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
return get(trimNonStrictAsync(count));
}
@Override
public RFuture<Void> removeGroupAsync(String groupName) {
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XGROUP, "DESTROY", getName(), groupName);
}
@Override
public void removeGroup(String groupName) {
get(removeGroupAsync(groupName));
}
@Override
public RFuture<Long> removeConsumerAsync(String groupName, String consumerName) {
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XGROUP_LONG, "DELCONSUMER", getName(), groupName, consumerName);
}
@Override
public long removeConsumer(String groupName, String consumerName) {
return get(removeConsumerAsync(groupName, consumerName));
}
@Override
public RFuture<Void> updateGroupMessageIdAsync(String groupName, StreamMessageId id) {
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XGROUP, "SETID", getName(), groupName, id);
}
@Override
public void updateGroupMessageId(String groupName, StreamMessageId id) {
get(updateGroupMessageIdAsync(groupName, id));
}
}

@ -49,6 +49,30 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
*/
void createGroup(String groupName, StreamMessageId id);
/**
* Removes group by name.
*
* @param groupName - name of group
*/
void removeGroup(String groupName);
/**
* Removes consumer of the group by name.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @return number of pending messages owned by consumer
*/
long removeConsumer(String groupName, String consumerName);
/**
* Updates next message id delivered to consumers.
*
* @param groupName - name of group
* @param id - Stream Message ID
*/
void updateGroupMessageId(String groupName, StreamMessageId id);
/**
* Marks pending messages by group name and stream <code>ids</code> as correctly processed.
*
@ -95,7 +119,7 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* @param count - amount of messages
* @return list
*/
List<PendingEntry> listPending(String groupName, StreamMessageId startId, StreamMessageId endId, int count, String consumerName);
List<PendingEntry> listPending(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, int count);
/**
* Transfers ownership of pending messages by id to a new consumer

@ -50,6 +50,32 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @return void
*/
RFuture<Void> createGroupAsync(String groupName, StreamMessageId id);
/**
* Removes group by name.
*
* @param groupName - name of group
* @return void
*/
RFuture<Void> removeGroupAsync(String groupName);
/**
* Removes consumer of the group by name.
*
* @param groupName - name of group
* @param consumerName - name of consumer
* @return number of pending messages owned by consumer
*/
RFuture<Long> removeConsumerAsync(String groupName, String consumerName);
/**
* Updates next message id delivered to consumers.
*
* @param groupName - name of group
* @param id - Stream Message ID
* @return void
*/
RFuture<Void> updateGroupMessageIdAsync(String groupName, StreamMessageId id);
/**
* Marks pending messages by group name and stream <code>ids</code> as correctly processed.
@ -97,7 +123,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param count - amount of messages
* @return list
*/
RFuture<List<PendingEntry>> listPendingAsync(String groupName, StreamMessageId startId, StreamMessageId endId, int count, String consumerName);
RFuture<List<PendingEntry>> listPendingAsync(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, int count);
/**
* Transfers ownership of pending messages by id to a new consumer

@ -407,6 +407,7 @@ public interface RedisCommands {
RedisStrictCommand<StreamMessageId> XADD = new RedisStrictCommand<StreamMessageId>("XADD", new StreamIdConvertor());
RedisStrictCommand<Void> XGROUP = new RedisStrictCommand<Void>("XGROUP", new VoidReplayConvertor());
RedisStrictCommand<Long> XGROUP_LONG = new RedisStrictCommand<Long>("XGROUP");
RedisStrictCommand<Void> XADD_VOID = new RedisStrictCommand<Void>("XADD", new VoidReplayConvertor());
RedisStrictCommand<Long> XLEN = new RedisStrictCommand<Long>("XLEN");
RedisStrictCommand<Long> XACK = new RedisStrictCommand<Long>("XACK");

@ -13,11 +13,68 @@ import org.redisson.api.PendingEntry;
import org.redisson.api.PendingResult;
import org.redisson.api.RStream;
import org.redisson.api.StreamMessageId;
import org.redisson.client.RedisException;
public class RedissonStreamTest extends BaseTest {
@Test
public void testRemove() {
public void testUpdateGroupMessageId() {
RStream<String, String> stream = redisson.getStream("test");
StreamMessageId id = stream.add("0", "0");
stream.createGroup("testGroup");
StreamMessageId id1 = stream.add("1", "1");
System.out.println("id1 " + id1);
StreamMessageId id2 = stream.add("2", "2");
System.out.println("id2 " + id2);
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
assertThat(s.size()).isEqualTo(2);
stream.updateGroupMessageId("testGroup", id);
Map<StreamMessageId, Map<String, String>> s2 = stream.readGroup("testGroup", "consumer2");
assertThat(s2.size()).isEqualTo(2);
}
@Test
public void testRemoveConsumer() {
RStream<String, String> stream = redisson.getStream("test");
stream.add("0", "0");
stream.createGroup("testGroup");
StreamMessageId id1 = stream.add("1", "1");
StreamMessageId id2 = stream.add("2", "2");
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1");
assertThat(s.size()).isEqualTo(2);
assertThat(stream.removeConsumer("testGroup", "consumer1")).isEqualTo(2);
assertThat(stream.removeConsumer("testGroup", "consumer2")).isZero();
}
@Test(expected = RedisException.class)
public void testRemoveGroup() {
RStream<String, String> stream = redisson.getStream("test");
stream.add("0", "0");
stream.createGroup("testGroup");
StreamMessageId id1 = stream.add("1", "1");
StreamMessageId id2 = stream.add("2", "2");
stream.removeGroup("testGroup");
stream.readGroup("testGroup", "consumer1");
}
@Test
public void testRemoveMessages() {
RStream<String, String> stream = redisson.getStream("test");
StreamMessageId id1 = stream.add("0", "0");
@ -116,7 +173,7 @@ public class RedissonStreamTest extends BaseTest {
assertThat(pendingEntry.getLastTimeDelivered()).isOne();
}
List<PendingEntry> list2 = stream.listPending("testGroup", StreamMessageId.MIN, StreamMessageId.MAX, 10, "consumer1");
List<PendingEntry> list2 = stream.listPending("testGroup", "consumer1", StreamMessageId.MIN, StreamMessageId.MAX, 10);
assertThat(list2.size()).isEqualTo(2);
for (PendingEntry pendingEntry : list2) {
assertThat(pendingEntry.getId()).isIn(id1, id2);

Loading…
Cancel
Save