From 462eacb707ed3f9ee326ac772d5c409de1c3b618 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 23 Jun 2021 09:39:07 +0300 Subject: [PATCH] Fixed - RedissonReactiveSubscription removes listener after first 32 messages. #3667 --- .../RedissonReactiveSubscription.java | 12 ---------- .../RedissonReactiveSubscription.java | 12 ---------- .../RedissonReactiveSubscription.java | 12 ---------- .../RedissonReactiveSubscription.java | 12 ---------- .../RedissonReactiveSubscription.java | 11 --------- .../RedissonSubscribeReactiveTest.java | 23 +++++++++++++++++++ 6 files changed, 23 insertions(+), 59 deletions(-) diff --git a/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java b/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java index 5d47b1e87..c890ef263 100644 --- a/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java +++ b/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java @@ -37,7 +37,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -223,7 +222,6 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { emitter.onRequest(n -> { monosListener.addListener(() -> { - AtomicLong counter = new AtomicLong(n); BaseRedisPubSubListener listener = new BaseRedisPubSubListener() { @Override public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) { @@ -234,11 +232,6 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { emitter.next(new PatternMessage<>(ByteBuffer.wrap(pattern.toString().getBytes()), ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])message))); - - if (counter.decrementAndGet() == 0) { - disposable.dispose(); - emitter.complete(); - } } @Override @@ -248,11 +241,6 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { } emitter.next(new ChannelMessage<>(ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])msg))); - - if (counter.decrementAndGet() == 0) { - disposable.dispose(); - emitter.complete(); - } } }; diff --git a/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java b/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java index 5d47b1e87..c890ef263 100644 --- a/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java +++ b/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java @@ -37,7 +37,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -223,7 +222,6 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { emitter.onRequest(n -> { monosListener.addListener(() -> { - AtomicLong counter = new AtomicLong(n); BaseRedisPubSubListener listener = new BaseRedisPubSubListener() { @Override public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) { @@ -234,11 +232,6 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { emitter.next(new PatternMessage<>(ByteBuffer.wrap(pattern.toString().getBytes()), ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])message))); - - if (counter.decrementAndGet() == 0) { - disposable.dispose(); - emitter.complete(); - } } @Override @@ -248,11 +241,6 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { } emitter.next(new ChannelMessage<>(ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])msg))); - - if (counter.decrementAndGet() == 0) { - disposable.dispose(); - emitter.complete(); - } } }; diff --git a/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java b/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java index 5d47b1e87..c890ef263 100644 --- a/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java +++ b/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java @@ -37,7 +37,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -223,7 +222,6 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { emitter.onRequest(n -> { monosListener.addListener(() -> { - AtomicLong counter = new AtomicLong(n); BaseRedisPubSubListener listener = new BaseRedisPubSubListener() { @Override public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) { @@ -234,11 +232,6 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { emitter.next(new PatternMessage<>(ByteBuffer.wrap(pattern.toString().getBytes()), ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])message))); - - if (counter.decrementAndGet() == 0) { - disposable.dispose(); - emitter.complete(); - } } @Override @@ -248,11 +241,6 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { } emitter.next(new ChannelMessage<>(ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])msg))); - - if (counter.decrementAndGet() == 0) { - disposable.dispose(); - emitter.complete(); - } } }; diff --git a/redisson-spring-data/redisson-spring-data-24/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java b/redisson-spring-data/redisson-spring-data-24/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java index 5d47b1e87..c890ef263 100644 --- a/redisson-spring-data/redisson-spring-data-24/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java +++ b/redisson-spring-data/redisson-spring-data-24/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java @@ -37,7 +37,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -223,7 +222,6 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { emitter.onRequest(n -> { monosListener.addListener(() -> { - AtomicLong counter = new AtomicLong(n); BaseRedisPubSubListener listener = new BaseRedisPubSubListener() { @Override public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) { @@ -234,11 +232,6 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { emitter.next(new PatternMessage<>(ByteBuffer.wrap(pattern.toString().getBytes()), ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])message))); - - if (counter.decrementAndGet() == 0) { - disposable.dispose(); - emitter.complete(); - } } @Override @@ -248,11 +241,6 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { } emitter.next(new ChannelMessage<>(ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])msg))); - - if (counter.decrementAndGet() == 0) { - disposable.dispose(); - emitter.complete(); - } } }; diff --git a/redisson-spring-data/redisson-spring-data-25/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java b/redisson-spring-data/redisson-spring-data-25/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java index 5d47b1e87..810d77da3 100644 --- a/redisson-spring-data/redisson-spring-data-25/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java +++ b/redisson-spring-data/redisson-spring-data-25/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java @@ -223,7 +223,6 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { emitter.onRequest(n -> { monosListener.addListener(() -> { - AtomicLong counter = new AtomicLong(n); BaseRedisPubSubListener listener = new BaseRedisPubSubListener() { @Override public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) { @@ -234,11 +233,6 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { emitter.next(new PatternMessage<>(ByteBuffer.wrap(pattern.toString().getBytes()), ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])message))); - - if (counter.decrementAndGet() == 0) { - disposable.dispose(); - emitter.complete(); - } } @Override @@ -248,11 +242,6 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { } emitter.next(new ChannelMessage<>(ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])msg))); - - if (counter.decrementAndGet() == 0) { - disposable.dispose(); - emitter.complete(); - } } }; diff --git a/redisson-spring-data/redisson-spring-data-25/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeReactiveTest.java b/redisson-spring-data/redisson-spring-data-25/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeReactiveTest.java index 58b8b5f41..90319d433 100644 --- a/redisson-spring-data/redisson-spring-data-25/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeReactiveTest.java +++ b/redisson-spring-data/redisson-spring-data-25/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeReactiveTest.java @@ -15,8 +15,31 @@ import org.springframework.data.redis.core.ReactiveStringRedisTemplate; import org.springframework.data.redis.listener.ChannelTopic; import reactor.core.publisher.Mono; +import static org.assertj.core.api.Assertions.assertThat; + public class RedissonSubscribeReactiveTest extends BaseConnectionTest { + @Test + public void testPubSub() { + RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson); + AtomicLong counter = new AtomicLong(); + + ReactiveStringRedisTemplate template = new ReactiveStringRedisTemplate(factory); + template.listenTo(ChannelTopic.of("test")).flatMap(message -> { + counter.incrementAndGet(); + return Mono.empty(); + }).subscribe(); + + for (int i = 0; i < 40; i++) { + ReactiveRedisConnection connection = factory.getReactiveConnection(); + connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block(); + } + + Awaitility.await().atMost(Duration.ONE_SECOND).untilAsserted(() -> { + assertThat(counter.get()).isEqualTo(40); + }); + } + @Test public void testTemplate() { RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);