diff --git a/redisson/src/main/java/org/redisson/RedissonStream.java b/redisson/src/main/java/org/redisson/RedissonStream.java index 141792c73..4cdbcbe0b 100644 --- a/redisson/src/main/java/org/redisson/RedissonStream.java +++ b/redisson/src/main/java/org/redisson/RedissonStream.java @@ -19,6 +19,7 @@ import java.util.*; import java.util.concurrent.TimeUnit; import org.redisson.api.*; +import org.redisson.api.listener.*; import org.redisson.api.stream.*; import org.redisson.client.codec.Codec; import org.redisson.client.codec.StringCodec; @@ -1396,6 +1397,7 @@ public class RedissonStream extends RedissonExpirable implements RStream trimAsync(StreamTrimArgs args) { return trimAsync(args, true); } @@ -1434,8 +1436,57 @@ public class RedissonStream extends RedissonExpirable implements RStream trimNonStrictAsync(StreamTrimArgs args) { return trimAsync(args, false); } + @Override + public RFuture addListenerAsync(ObjectListener listener) { + if (listener instanceof StreamAddListener) { + return addListenerAsync("__keyevent@*:xadd", (StreamAddListener) listener, StreamAddListener::onAdd); + } + if (listener instanceof StreamRemoveListener) { + return addListenerAsync("__keyevent@*:xdel", (StreamRemoveListener) listener, StreamRemoveListener::onRemove); + } + if (listener instanceof StreamCreateConsumerListener) { + return addListenerAsync("__keyevent@*:xgroup-createconsumer", (StreamCreateConsumerListener) listener, StreamCreateConsumerListener::onCreate); + } + if (listener instanceof StreamRemoveConsumerListener) { + return addListenerAsync("__keyevent@*:xgroup-delconsumer", (StreamRemoveConsumerListener) listener, StreamRemoveConsumerListener::onRemove); + } + if (listener instanceof StreamCreateGroupListener) { + return addListenerAsync("__keyevent@*:xgroup-create", (StreamCreateGroupListener) listener, StreamCreateGroupListener::onCreate); + } + if (listener instanceof StreamRemoveGroupListener) { + return addListenerAsync("__keyevent@*:xgroup-destroy", (StreamRemoveGroupListener) listener, StreamRemoveGroupListener::onRemove); + } + return super.addListenerAsync(listener); + } + + @Override + public int addListener(ObjectListener listener) { + if (listener instanceof StreamAddListener) { + return addListener("__keyevent@*:xadd", (StreamAddListener) listener, StreamAddListener::onAdd); + } + if (listener instanceof StreamRemoveListener) { + return addListener("__keyevent@*:xdel", (StreamRemoveListener) listener, StreamRemoveListener::onRemove); + } + if (listener instanceof StreamCreateConsumerListener) { + return addListener("__keyevent@*:xgroup-createconsumer", (StreamCreateConsumerListener) listener, StreamCreateConsumerListener::onCreate); + } + if (listener instanceof StreamRemoveConsumerListener) { + return addListener("__keyevent@*:xgroup-delconsumer", (StreamRemoveConsumerListener) listener, StreamRemoveConsumerListener::onRemove); + } + if (listener instanceof StreamCreateGroupListener) { + return addListener("__keyevent@*:xgroup-create", (StreamCreateGroupListener) listener, StreamCreateGroupListener::onCreate); + } + if (listener instanceof StreamRemoveGroupListener) { + return addListener("__keyevent@*:xgroup-destroy", (StreamRemoveGroupListener) listener, StreamRemoveGroupListener::onRemove); + } + if (listener instanceof StreamTrimListener) { + return addListener("__keyevent@*:xtrim", (StreamTrimListener) listener, StreamTrimListener::onTrim); + } + return super.addListener(listener); + } } diff --git a/redisson/src/main/java/org/redisson/api/RStream.java b/redisson/src/main/java/org/redisson/api/RStream.java index 405fd55f6..4060b1727 100644 --- a/redisson/src/main/java/org/redisson/api/RStream.java +++ b/redisson/src/main/java/org/redisson/api/RStream.java @@ -847,5 +847,24 @@ public interface RStream extends RStreamAsync, RExpirable { * @return list of info objects */ List listConsumers(String groupName); - + + /** + * Adds object event listener + * + * @see org.redisson.api.listener.StreamAddListener + * @see org.redisson.api.listener.StreamRemoveListener + * @see org.redisson.api.listener.StreamCreateGroupListener + * @see org.redisson.api.listener.StreamRemoveGroupListener + * @see org.redisson.api.listener.StreamCreateConsumerListener + * @see org.redisson.api.listener.StreamRemoveConsumerListener + * @see org.redisson.api.listener.StreamTrimListener + * @see org.redisson.api.ExpiredObjectListener + * @see org.redisson.api.DeletedObjectListener + * + * @param listener - object event listener + * @return listener id + */ + int addListener(ObjectListener listener); + + } diff --git a/redisson/src/main/java/org/redisson/api/RStreamAsync.java b/redisson/src/main/java/org/redisson/api/RStreamAsync.java index ffe8995a0..9cafc1ab8 100644 --- a/redisson/src/main/java/org/redisson/api/RStreamAsync.java +++ b/redisson/src/main/java/org/redisson/api/RStreamAsync.java @@ -879,5 +879,23 @@ public interface RStreamAsync extends RExpirableAsync { * @return list of info objects */ RFuture> listConsumersAsync(String groupName); - + + /** + * Adds object event listener + * + * @see org.redisson.api.listener.StreamAddListener + * @see org.redisson.api.listener.StreamRemoveListener + * @see org.redisson.api.listener.StreamCreateGroupListener + * @see org.redisson.api.listener.StreamRemoveGroupListener + * @see org.redisson.api.listener.StreamCreateConsumerListener + * @see org.redisson.api.listener.StreamRemoveConsumerListener + * @see org.redisson.api.listener.StreamTrimListener + * @see org.redisson.api.ExpiredObjectListener + * @see org.redisson.api.DeletedObjectListener + * + * @param listener object event listener + * @return listener id + */ + RFuture addListenerAsync(ObjectListener listener); + } diff --git a/redisson/src/main/java/org/redisson/api/RStreamReactive.java b/redisson/src/main/java/org/redisson/api/RStreamReactive.java index 730ab2688..9eb6137a7 100644 --- a/redisson/src/main/java/org/redisson/api/RStreamReactive.java +++ b/redisson/src/main/java/org/redisson/api/RStreamReactive.java @@ -15,13 +15,13 @@ */ package org.redisson.api; +import org.redisson.api.stream.*; +import reactor.core.publisher.Mono; + import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import org.redisson.api.stream.*; -import reactor.core.publisher.Mono; - /** * Reactive interface for Redis Stream object. *

@@ -817,5 +817,23 @@ public interface RStreamReactive extends RExpirableReactive { * @return map */ Mono>> pendingRange(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, int count); - + + /** + * Adds object event listener + * + * @see org.redisson.api.listener.StreamAddListener + * @see org.redisson.api.listener.StreamRemoveListener + * @see org.redisson.api.listener.StreamCreateGroupListener + * @see org.redisson.api.listener.StreamRemoveGroupListener + * @see org.redisson.api.listener.StreamCreateConsumerListener + * @see org.redisson.api.listener.StreamRemoveConsumerListener + * @see org.redisson.api.listener.StreamTrimListener + * @see org.redisson.api.ExpiredObjectListener + * @see org.redisson.api.DeletedObjectListener + * + * @param listener object event listener + * @return listener id + */ + Mono addListener(ObjectListener listener); + } diff --git a/redisson/src/main/java/org/redisson/api/RStreamRx.java b/redisson/src/main/java/org/redisson/api/RStreamRx.java index bd9d96904..668309dd7 100644 --- a/redisson/src/main/java/org/redisson/api/RStreamRx.java +++ b/redisson/src/main/java/org/redisson/api/RStreamRx.java @@ -15,14 +15,14 @@ */ package org.redisson.api; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - import io.reactivex.rxjava3.core.Completable; import io.reactivex.rxjava3.core.Single; import org.redisson.api.stream.*; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + /** * Reactive interface for Redis Stream object. *

@@ -818,5 +818,23 @@ public interface RStreamRx extends RExpirableRx { * @return map */ Single>> pendingRange(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, int count); - + + /** + * Adds object event listener + * + * @see org.redisson.api.listener.StreamAddListener + * @see org.redisson.api.listener.StreamRemoveListener + * @see org.redisson.api.listener.StreamCreateGroupListener + * @see org.redisson.api.listener.StreamRemoveGroupListener + * @see org.redisson.api.listener.StreamCreateConsumerListener + * @see org.redisson.api.listener.StreamRemoveConsumerListener + * @see org.redisson.api.listener.StreamTrimListener + * @see org.redisson.api.ExpiredObjectListener + * @see org.redisson.api.DeletedObjectListener + * + * @param listener object event listener + * @return listener id + */ + Single addListener(ObjectListener listener); + } diff --git a/redisson/src/main/java/org/redisson/api/listener/StreamAddListener.java b/redisson/src/main/java/org/redisson/api/listener/StreamAddListener.java new file mode 100644 index 000000000..2d23679a7 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/listener/StreamAddListener.java @@ -0,0 +1,37 @@ +/** + * Copyright (c) 2013-2024 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api.listener; + +import org.redisson.api.ObjectListener; + +/** + * Redisson Object Event listener for xadd event + * published by Redis when an element added into Stream. + *

+ * Redis notify-keyspace-events setting should contain Et letters + * + * @author Nikita Koksharov + */ +public interface StreamAddListener extends ObjectListener { + + /** + * Invoked when a new entry is added to RStream object + * + * @param name object name + */ + void onAdd(String name); + +} diff --git a/redisson/src/main/java/org/redisson/api/listener/StreamCreateConsumerListener.java b/redisson/src/main/java/org/redisson/api/listener/StreamCreateConsumerListener.java new file mode 100644 index 000000000..5a919bfcf --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/listener/StreamCreateConsumerListener.java @@ -0,0 +1,37 @@ +/** + * Copyright (c) 2013-2024 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api.listener; + +import org.redisson.api.ObjectListener; + +/** + * Redisson Object Event listener for xgroup-createconsumer event + * published by Redis when a Stream Consumer is created. + *

+ * Redis notify-keyspace-events setting should contain Et letters + * + * @author Nikita Koksharov + */ +public interface StreamCreateConsumerListener extends ObjectListener { + + /** + * Invoked when a Stream Consumer is created + * + * @param name object name + */ + void onCreate(String name); + +} diff --git a/redisson/src/main/java/org/redisson/api/listener/StreamCreateGroupListener.java b/redisson/src/main/java/org/redisson/api/listener/StreamCreateGroupListener.java new file mode 100644 index 000000000..b8086ed04 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/listener/StreamCreateGroupListener.java @@ -0,0 +1,37 @@ +/** + * Copyright (c) 2013-2024 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api.listener; + +import org.redisson.api.ObjectListener; + +/** + * Redisson Object Event listener for xgroup-create event + * published by Redis when a Stream Group is created. + *

+ * Redis notify-keyspace-events setting should contain Et letters + * + * @author Nikita Koksharov + */ +public interface StreamCreateGroupListener extends ObjectListener { + + /** + * Invoked when a Stream Group is created + * + * @param name object name + */ + void onCreate(String name); + +} diff --git a/redisson/src/main/java/org/redisson/api/listener/StreamRemoveConsumerListener.java b/redisson/src/main/java/org/redisson/api/listener/StreamRemoveConsumerListener.java new file mode 100644 index 000000000..7bf780475 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/listener/StreamRemoveConsumerListener.java @@ -0,0 +1,37 @@ +/** + * Copyright (c) 2013-2024 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api.listener; + +import org.redisson.api.ObjectListener; + +/** + * Redisson Object Event listener for xgroup-delconsumer event + * published by Redis when a Stream Consumer is removed. + *

+ * Redis notify-keyspace-events setting should contain Et letters + * + * @author Nikita Koksharov + */ +public interface StreamRemoveConsumerListener extends ObjectListener { + + /** + * Invoked when a Stream Consumer is removed + * + * @param name object name + */ + void onRemove(String name); + +} diff --git a/redisson/src/main/java/org/redisson/api/listener/StreamRemoveGroupListener.java b/redisson/src/main/java/org/redisson/api/listener/StreamRemoveGroupListener.java new file mode 100644 index 000000000..c39a7cc07 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/listener/StreamRemoveGroupListener.java @@ -0,0 +1,37 @@ +/** + * Copyright (c) 2013-2024 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api.listener; + +import org.redisson.api.ObjectListener; + +/** + * Redisson Object Event listener for xgroup-destroy event + * published by Redis when a Stream Group is removed. + *

+ * Redis notify-keyspace-events setting should contain Et letters + * + * @author Nikita Koksharov + */ +public interface StreamRemoveGroupListener extends ObjectListener { + + /** + * Invoked when a Stream Group is removed + * + * @param name object name + */ + void onRemove(String name); + +} diff --git a/redisson/src/main/java/org/redisson/api/listener/StreamRemoveListener.java b/redisson/src/main/java/org/redisson/api/listener/StreamRemoveListener.java new file mode 100644 index 000000000..29b9fc970 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/listener/StreamRemoveListener.java @@ -0,0 +1,37 @@ +/** + * Copyright (c) 2013-2024 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api.listener; + +import org.redisson.api.ObjectListener; + +/** + * Redisson Object Event listener for srem event + * published by Redis when an element removed from Stream. + *

+ * Redis notify-keyspace-events setting should contain Et letters + * + * @author Nikita Koksharov + */ +public interface StreamRemoveListener extends ObjectListener { + + /** + * Invoked when an element removed from Stream + * + * @param name object name + */ + void onRemove(String name); + +} diff --git a/redisson/src/main/java/org/redisson/api/listener/StreamTrimListener.java b/redisson/src/main/java/org/redisson/api/listener/StreamTrimListener.java new file mode 100644 index 000000000..c0e5ab2da --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/listener/StreamTrimListener.java @@ -0,0 +1,38 @@ +/** + * Copyright (c) 2013-2024 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api.listener; + +import org.redisson.api.ObjectListener; + +/** + * Redisson Object Event listener for ltrim event + * published by Redis when trim operation is executed for Stream. + *

+ * Redis notify-keyspace-events setting should contain Et letters + * + * @author Nikita Koksharov + * + */ +public interface StreamTrimListener extends ObjectListener { + + /** + * Invoked when trim operation is executed for Stream + * + * @param name - name of object + */ + void onTrim(String name); + +} diff --git a/redisson/src/test/java/org/redisson/RedissonStreamTest.java b/redisson/src/test/java/org/redisson/RedissonStreamTest.java index 71efb2865..0970b0777 100644 --- a/redisson/src/test/java/org/redisson/RedissonStreamTest.java +++ b/redisson/src/test/java/org/redisson/RedissonStreamTest.java @@ -4,17 +4,41 @@ import org.awaitility.Awaitility; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.redisson.api.*; +import org.redisson.api.listener.StreamAddListener; import org.redisson.api.stream.*; import org.redisson.client.RedisException; import java.time.Duration; import java.util.*; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; public class RedissonStreamTest extends RedisDockerTest { + @Test + public void testAddListener() { + testWithParams(redisson -> { + RStream ss = redisson.getStream("test"); + ss.createGroup(StreamCreateGroupArgs.name("test-group").makeStream()); + CountDownLatch latch = new CountDownLatch(1); + ss.addListener(new StreamAddListener() { + @Override + public void onAdd(String name) { + latch.countDown(); + } + }); + ss.add(StreamAddArgs.entry("test1", "test2")); + + try { + assertThat(latch.await(1, TimeUnit.SECONDS)).isTrue(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }, NOTIFY_KEYSPACE_EVENTS, "Et"); + } + @Test public void testEmptyMap() { RStream stream = redisson.getStream("stream");