Feature - Add listeners for RStream object #5552

pull/5564/head
Nikita Koksharov 1 year ago
parent db7f391386
commit 81443fd413

@ -19,6 +19,7 @@ import java.util.*;
import java.util.concurrent.TimeUnit;
import org.redisson.api.*;
import org.redisson.api.listener.*;
import org.redisson.api.stream.*;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
@ -1396,6 +1397,7 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
return get(trimAsync(args));
}
@Override
public RFuture<Long> trimAsync(StreamTrimArgs args) {
return trimAsync(args, true);
}
@ -1434,8 +1436,57 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
return get(trimNonStrictAsync(args));
}
@Override
public RFuture<Long> trimNonStrictAsync(StreamTrimArgs args) {
return trimAsync(args, false);
}
@Override
public RFuture<Integer> addListenerAsync(ObjectListener listener) {
if (listener instanceof StreamAddListener) {
return addListenerAsync("__keyevent@*:xadd", (StreamAddListener) listener, StreamAddListener::onAdd);
}
if (listener instanceof StreamRemoveListener) {
return addListenerAsync("__keyevent@*:xdel", (StreamRemoveListener) listener, StreamRemoveListener::onRemove);
}
if (listener instanceof StreamCreateConsumerListener) {
return addListenerAsync("__keyevent@*:xgroup-createconsumer", (StreamCreateConsumerListener) listener, StreamCreateConsumerListener::onCreate);
}
if (listener instanceof StreamRemoveConsumerListener) {
return addListenerAsync("__keyevent@*:xgroup-delconsumer", (StreamRemoveConsumerListener) listener, StreamRemoveConsumerListener::onRemove);
}
if (listener instanceof StreamCreateGroupListener) {
return addListenerAsync("__keyevent@*:xgroup-create", (StreamCreateGroupListener) listener, StreamCreateGroupListener::onCreate);
}
if (listener instanceof StreamRemoveGroupListener) {
return addListenerAsync("__keyevent@*:xgroup-destroy", (StreamRemoveGroupListener) listener, StreamRemoveGroupListener::onRemove);
}
return super.addListenerAsync(listener);
}
@Override
public int addListener(ObjectListener listener) {
if (listener instanceof StreamAddListener) {
return addListener("__keyevent@*:xadd", (StreamAddListener) listener, StreamAddListener::onAdd);
}
if (listener instanceof StreamRemoveListener) {
return addListener("__keyevent@*:xdel", (StreamRemoveListener) listener, StreamRemoveListener::onRemove);
}
if (listener instanceof StreamCreateConsumerListener) {
return addListener("__keyevent@*:xgroup-createconsumer", (StreamCreateConsumerListener) listener, StreamCreateConsumerListener::onCreate);
}
if (listener instanceof StreamRemoveConsumerListener) {
return addListener("__keyevent@*:xgroup-delconsumer", (StreamRemoveConsumerListener) listener, StreamRemoveConsumerListener::onRemove);
}
if (listener instanceof StreamCreateGroupListener) {
return addListener("__keyevent@*:xgroup-create", (StreamCreateGroupListener) listener, StreamCreateGroupListener::onCreate);
}
if (listener instanceof StreamRemoveGroupListener) {
return addListener("__keyevent@*:xgroup-destroy", (StreamRemoveGroupListener) listener, StreamRemoveGroupListener::onRemove);
}
if (listener instanceof StreamTrimListener) {
return addListener("__keyevent@*:xtrim", (StreamTrimListener) listener, StreamTrimListener::onTrim);
}
return super.addListener(listener);
}
}

@ -847,5 +847,24 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* @return list of info objects
*/
List<StreamConsumer> listConsumers(String groupName);
/**
* Adds object event listener
*
* @see org.redisson.api.listener.StreamAddListener
* @see org.redisson.api.listener.StreamRemoveListener
* @see org.redisson.api.listener.StreamCreateGroupListener
* @see org.redisson.api.listener.StreamRemoveGroupListener
* @see org.redisson.api.listener.StreamCreateConsumerListener
* @see org.redisson.api.listener.StreamRemoveConsumerListener
* @see org.redisson.api.listener.StreamTrimListener
* @see org.redisson.api.ExpiredObjectListener
* @see org.redisson.api.DeletedObjectListener
*
* @param listener - object event listener
* @return listener id
*/
int addListener(ObjectListener listener);
}

@ -879,5 +879,23 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @return list of info objects
*/
RFuture<List<StreamConsumer>> listConsumersAsync(String groupName);
/**
* Adds object event listener
*
* @see org.redisson.api.listener.StreamAddListener
* @see org.redisson.api.listener.StreamRemoveListener
* @see org.redisson.api.listener.StreamCreateGroupListener
* @see org.redisson.api.listener.StreamRemoveGroupListener
* @see org.redisson.api.listener.StreamCreateConsumerListener
* @see org.redisson.api.listener.StreamRemoveConsumerListener
* @see org.redisson.api.listener.StreamTrimListener
* @see org.redisson.api.ExpiredObjectListener
* @see org.redisson.api.DeletedObjectListener
*
* @param listener object event listener
* @return listener id
*/
RFuture<Integer> addListenerAsync(ObjectListener listener);
}

@ -15,13 +15,13 @@
*/
package org.redisson.api;
import org.redisson.api.stream.*;
import reactor.core.publisher.Mono;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.redisson.api.stream.*;
import reactor.core.publisher.Mono;
/**
* Reactive interface for Redis Stream object.
* <p>
@ -817,5 +817,23 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @return map
*/
Mono<Map<StreamMessageId, Map<K, V>>> pendingRange(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, int count);
/**
* Adds object event listener
*
* @see org.redisson.api.listener.StreamAddListener
* @see org.redisson.api.listener.StreamRemoveListener
* @see org.redisson.api.listener.StreamCreateGroupListener
* @see org.redisson.api.listener.StreamRemoveGroupListener
* @see org.redisson.api.listener.StreamCreateConsumerListener
* @see org.redisson.api.listener.StreamRemoveConsumerListener
* @see org.redisson.api.listener.StreamTrimListener
* @see org.redisson.api.ExpiredObjectListener
* @see org.redisson.api.DeletedObjectListener
*
* @param listener object event listener
* @return listener id
*/
Mono<Integer> addListener(ObjectListener listener);
}

@ -15,14 +15,14 @@
*/
package org.redisson.api;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Single;
import org.redisson.api.stream.*;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Reactive interface for Redis Stream object.
* <p>
@ -818,5 +818,23 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @return map
*/
Single<Map<StreamMessageId, Map<K, V>>> pendingRange(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, int count);
/**
* Adds object event listener
*
* @see org.redisson.api.listener.StreamAddListener
* @see org.redisson.api.listener.StreamRemoveListener
* @see org.redisson.api.listener.StreamCreateGroupListener
* @see org.redisson.api.listener.StreamRemoveGroupListener
* @see org.redisson.api.listener.StreamCreateConsumerListener
* @see org.redisson.api.listener.StreamRemoveConsumerListener
* @see org.redisson.api.listener.StreamTrimListener
* @see org.redisson.api.ExpiredObjectListener
* @see org.redisson.api.DeletedObjectListener
*
* @param listener object event listener
* @return listener id
*/
Single<Integer> addListener(ObjectListener listener);
}

@ -0,0 +1,37 @@
/**
* Copyright (c) 2013-2024 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.listener;
import org.redisson.api.ObjectListener;
/**
* Redisson Object Event listener for <b>xadd</b> event
* published by Redis when an element added into Stream.
* <p>
* Redis notify-keyspace-events setting should contain Et letters
*
* @author Nikita Koksharov
*/
public interface StreamAddListener extends ObjectListener {
/**
* Invoked when a new entry is added to RStream object
*
* @param name object name
*/
void onAdd(String name);
}

@ -0,0 +1,37 @@
/**
* Copyright (c) 2013-2024 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.listener;
import org.redisson.api.ObjectListener;
/**
* Redisson Object Event listener for <b>xgroup-createconsumer</b> event
* published by Redis when a Stream Consumer is created.
* <p>
* Redis notify-keyspace-events setting should contain Et letters
*
* @author Nikita Koksharov
*/
public interface StreamCreateConsumerListener extends ObjectListener {
/**
* Invoked when a Stream Consumer is created
*
* @param name object name
*/
void onCreate(String name);
}

@ -0,0 +1,37 @@
/**
* Copyright (c) 2013-2024 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.listener;
import org.redisson.api.ObjectListener;
/**
* Redisson Object Event listener for <b>xgroup-create</b> event
* published by Redis when a Stream Group is created.
* <p>
* Redis notify-keyspace-events setting should contain Et letters
*
* @author Nikita Koksharov
*/
public interface StreamCreateGroupListener extends ObjectListener {
/**
* Invoked when a Stream Group is created
*
* @param name object name
*/
void onCreate(String name);
}

@ -0,0 +1,37 @@
/**
* Copyright (c) 2013-2024 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.listener;
import org.redisson.api.ObjectListener;
/**
* Redisson Object Event listener for <b>xgroup-delconsumer</b> event
* published by Redis when a Stream Consumer is removed.
* <p>
* Redis notify-keyspace-events setting should contain Et letters
*
* @author Nikita Koksharov
*/
public interface StreamRemoveConsumerListener extends ObjectListener {
/**
* Invoked when a Stream Consumer is removed
*
* @param name object name
*/
void onRemove(String name);
}

@ -0,0 +1,37 @@
/**
* Copyright (c) 2013-2024 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.listener;
import org.redisson.api.ObjectListener;
/**
* Redisson Object Event listener for <b>xgroup-destroy</b> event
* published by Redis when a Stream Group is removed.
* <p>
* Redis notify-keyspace-events setting should contain Et letters
*
* @author Nikita Koksharov
*/
public interface StreamRemoveGroupListener extends ObjectListener {
/**
* Invoked when a Stream Group is removed
*
* @param name object name
*/
void onRemove(String name);
}

@ -0,0 +1,37 @@
/**
* Copyright (c) 2013-2024 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.listener;
import org.redisson.api.ObjectListener;
/**
* Redisson Object Event listener for <b>srem</b> event
* published by Redis when an element removed from Stream.
* <p>
* Redis notify-keyspace-events setting should contain Et letters
*
* @author Nikita Koksharov
*/
public interface StreamRemoveListener extends ObjectListener {
/**
* Invoked when an element removed from Stream
*
* @param name object name
*/
void onRemove(String name);
}

@ -0,0 +1,38 @@
/**
* Copyright (c) 2013-2024 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.listener;
import org.redisson.api.ObjectListener;
/**
* Redisson Object Event listener for <b>ltrim</b> event
* published by Redis when trim operation is executed for Stream.
* <p>
* Redis notify-keyspace-events setting should contain Et letters
*
* @author Nikita Koksharov
*
*/
public interface StreamTrimListener extends ObjectListener {
/**
* Invoked when trim operation is executed for Stream
*
* @param name - name of object
*/
void onTrim(String name);
}

@ -4,17 +4,41 @@ import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.redisson.api.*;
import org.redisson.api.listener.StreamAddListener;
import org.redisson.api.stream.*;
import org.redisson.client.RedisException;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonStreamTest extends RedisDockerTest {
@Test
public void testAddListener() {
testWithParams(redisson -> {
RStream<String, String> ss = redisson.getStream("test");
ss.createGroup(StreamCreateGroupArgs.name("test-group").makeStream());
CountDownLatch latch = new CountDownLatch(1);
ss.addListener(new StreamAddListener() {
@Override
public void onAdd(String name) {
latch.countDown();
}
});
ss.add(StreamAddArgs.entry("test1", "test2"));
try {
assertThat(latch.await(1, TimeUnit.SECONDS)).isTrue();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, NOTIFY_KEYSPACE_EVENTS, "Et");
}
@Test
public void testEmptyMap() {
RStream<Object, Object> stream = redisson.getStream("stream");

Loading…
Cancel
Save