From 15c464013d295aaae83bb3fd6daaa273cded3188 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Mon, 13 Apr 2020 13:54:20 +0300 Subject: [PATCH] Fixed - RedissonReactiveSubscription.subscribe and receive methods aren't synchronized. #2686 --- .../RedissonReactiveSubscription.java | 219 ++++++++++++------ .../RedissonSubscribeReactiveTest.java | 20 ++ .../RedissonReactiveSubscription.java | 219 ++++++++++++------ .../RedissonSubscribeReactiveTest.java | 20 ++ 4 files changed, 342 insertions(+), 136 deletions(-) diff --git a/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java b/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java index 7db6a0b25..adc406f63 100644 --- a/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java +++ b/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java @@ -47,9 +47,47 @@ import reactor.core.publisher.Mono; */ public class RedissonReactiveSubscription implements ReactiveSubscription { + public static class ListenableCounter { + + private int state; + private Runnable r; + + public synchronized void acquire() { + state++; + } + + public void release() { + synchronized (this) { + state--; + if (state != 0) { + return; + } + } + + if (r != null) { + r.run(); + r = null; + } + } + + public synchronized void addListener(Runnable r) { + synchronized (this) { + if (state != 0) { + this.r = r; + return; + } + } + + r.run(); + } + + } + private final Map channels = new ConcurrentHashMap(); private final Map patterns = new ConcurrentHashMap(); - + + private final ListenableCounter monosListener = new ListenableCounter(); + private final PublishSubscribeService subscribeService; public RedissonReactiveSubscription(ConnectionManager connectionManager) { @@ -58,14 +96,21 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { @Override public Mono subscribe(ByteBuffer... channels) { - RedissonPromise result = new RedissonPromise(); - CountableListener listener = new CountableListener(result, null, channels.length); - for (ByteBuffer channel : channels) { - RFuture f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, toChannelName(channel)); - f.onComplete((res, e) -> RedissonReactiveSubscription.this.channels.put(channel, res)); - f.onComplete(listener); - } - return Mono.fromFuture(result); + monosListener.acquire(); + return Mono.defer(() -> { + RedissonPromise result = new RedissonPromise(); + result.onComplete((r, ex) -> { + monosListener.release(); + }); + CountableListener listener = new CountableListener(result, null, channels.length); + for (ByteBuffer channel : channels) { + RFuture f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, toChannelName(channel)); + f.onComplete((res, e) -> RedissonReactiveSubscription.this.channels.put(channel, res)); + f.onComplete(listener); + } + + return Mono.fromFuture(result); + }); } protected ChannelName toChannelName(ByteBuffer channel) { @@ -74,14 +119,20 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { @Override public Mono pSubscribe(ByteBuffer... patterns) { - RedissonPromise result = new RedissonPromise(); - CountableListener listener = new CountableListener(result, null, patterns.length); - for (ByteBuffer channel : patterns) { - RFuture f = subscribeService.psubscribe(toChannelName(channel), ByteArrayCodec.INSTANCE); - f.onComplete((res, e) -> RedissonReactiveSubscription.this.patterns.put(channel, res)); - f.onComplete(listener); - } - return Mono.fromFuture(result); + monosListener.acquire(); + return Mono.defer(() -> { + RedissonPromise result = new RedissonPromise(); + result.onComplete((r, ex) -> { + monosListener.release(); + }); + CountableListener listener = new CountableListener(result, null, patterns.length); + for (ByteBuffer channel : patterns) { + RFuture f = subscribeService.psubscribe(toChannelName(channel), ByteArrayCodec.INSTANCE); + f.onComplete((res, e) -> RedissonReactiveSubscription.this.patterns.put(channel, res)); + f.onComplete(listener); + } + return Mono.fromFuture(result); + }); } @Override @@ -91,13 +142,28 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { @Override public Mono unsubscribe(ByteBuffer... channels) { - RedissonPromise result = new RedissonPromise(); - CountableListener listener = new CountableListener(result, null, channels.length); - for (ByteBuffer channel : channels) { - RFuture f = subscribeService.unsubscribe(toChannelName(channel), PubSubType.UNSUBSCRIBE); - f.onComplete(listener); - } - return Mono.fromFuture(result); + monosListener.acquire(); + return Mono.defer(() -> { + RedissonPromise result = new RedissonPromise(); + result.onComplete((r, ex) -> { + monosListener.release(); + }); + CountableListener listener = new CountableListener(result, null, channels.length); + for (ByteBuffer channel : channels) { + ChannelName cn = toChannelName(channel); + RFuture f = subscribeService.unsubscribe(cn, PubSubType.UNSUBSCRIBE); + f.onComplete((res, e) -> { + synchronized (RedissonReactiveSubscription.this.channels) { + PubSubConnectionEntry entry = RedissonReactiveSubscription.this.channels.get(channel); + if (!entry.hasListeners(cn)) { + RedissonReactiveSubscription.this.channels.remove(channel); + } + } + }); + f.onComplete(listener); + } + return Mono.fromFuture(result); + }); } @Override @@ -107,13 +173,28 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { @Override public Mono pUnsubscribe(ByteBuffer... patterns) { - RedissonPromise result = new RedissonPromise(); - CountableListener listener = new CountableListener(result, null, patterns.length); - for (ByteBuffer channel : patterns) { - RFuture f = subscribeService.unsubscribe(toChannelName(channel), PubSubType.PUNSUBSCRIBE); - f.onComplete(listener); - } - return Mono.fromFuture(result); + monosListener.acquire(); + return Mono.defer(() -> { + RedissonPromise result = new RedissonPromise(); + result.onComplete((r, ex) -> { + monosListener.release(); + }); + CountableListener listener = new CountableListener(result, null, patterns.length); + for (ByteBuffer channel : patterns) { + ChannelName cn = toChannelName(channel); + RFuture f = subscribeService.unsubscribe(cn, PubSubType.PUNSUBSCRIBE); + f.onComplete((res, e) -> { + synchronized (RedissonReactiveSubscription.this.patterns) { + PubSubConnectionEntry entry = RedissonReactiveSubscription.this.patterns.get(channel); + if (!entry.hasListeners(cn)) { + RedissonReactiveSubscription.this.patterns.remove(channel); + } + } + }); + f.onComplete(listener); + } + return Mono.fromFuture(result); + }); } @Override @@ -134,52 +215,54 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { if (flux.get() != null) { return flux.get(); } - + Flux> f = Flux.>create(emitter -> { emitter.onRequest(n -> { - - AtomicLong counter = new AtomicLong(n); - BaseRedisPubSubListener listener = new BaseRedisPubSubListener() { - @Override - public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) { - emitter.next(new PatternMessage<>(ByteBuffer.wrap(pattern.toString().getBytes()), - ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])message))); - - if (counter.decrementAndGet() == 0) { - disposable.dispose(); - emitter.complete(); + + monosListener.addListener(() -> { + AtomicLong counter = new AtomicLong(n); + BaseRedisPubSubListener listener = new BaseRedisPubSubListener() { + @Override + public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) { + emitter.next(new PatternMessage<>(ByteBuffer.wrap(pattern.toString().getBytes()), + ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])message))); + + if (counter.decrementAndGet() == 0) { + disposable.dispose(); + emitter.complete(); + } } - } - - @Override - public void onMessage(CharSequence channel, Object msg) { - emitter.next(new ChannelMessage<>(ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])msg))); - - if (counter.decrementAndGet() == 0) { - disposable.dispose(); - emitter.complete(); + + @Override + public void onMessage(CharSequence channel, Object msg) { + emitter.next(new ChannelMessage<>(ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])msg))); + + if (counter.decrementAndGet() == 0) { + disposable.dispose(); + emitter.complete(); + } } - } - }; + }; + + disposable = () -> { + for (Entry entry : channels.entrySet()) { + entry.getValue().removeListener(toChannelName(entry.getKey()), listener); + } + for (Entry entry : patterns.entrySet()) { + entry.getValue().removeListener(toChannelName(entry.getKey()), listener); + } + }; - disposable = () -> { for (Entry entry : channels.entrySet()) { - entry.getValue().removeListener(toChannelName(entry.getKey()), listener); + entry.getValue().addListener(toChannelName(entry.getKey()), listener); } + for (Entry entry : patterns.entrySet()) { - entry.getValue().removeListener(toChannelName(entry.getKey()), listener); + entry.getValue().addListener(toChannelName(entry.getKey()), listener); } - }; - - for (Entry entry : channels.entrySet()) { - entry.getValue().addListener(toChannelName(entry.getKey()), listener); - } - - for (Entry entry : patterns.entrySet()) { - entry.getValue().addListener(toChannelName(entry.getKey()), listener); - } - - emitter.onDispose(disposable); + + emitter.onDispose(disposable); + }); }); }); diff --git a/redisson-spring-data/redisson-spring-data-21/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeReactiveTest.java b/redisson-spring-data/redisson-spring-data-21/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeReactiveTest.java index f2dc24187..238da0f9e 100644 --- a/redisson-spring-data/redisson-spring-data-21/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeReactiveTest.java +++ b/redisson-spring-data/redisson-spring-data-21/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeReactiveTest.java @@ -10,10 +10,30 @@ import org.junit.Test; import org.springframework.data.redis.connection.ReactiveRedisConnection; import org.springframework.data.redis.connection.ReactiveSubscription; +import org.springframework.data.redis.core.ReactiveStringRedisTemplate; +import org.springframework.data.redis.listener.ChannelTopic; import reactor.core.publisher.Mono; public class RedissonSubscribeReactiveTest extends BaseConnectionTest { + @Test + public void testTemplate() { + RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson); + AtomicReference msg = new AtomicReference(); + + ReactiveStringRedisTemplate template = new ReactiveStringRedisTemplate(factory); + template.listenTo(ChannelTopic.of("test")).flatMap(message -> { + msg.set(message.getMessage().getBytes()); + return template.delete("myobj"); + }).subscribe(); + + ReactiveRedisConnection connection = factory.getReactiveConnection(); + connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block(); + + Awaitility.await().atMost(Duration.ONE_SECOND) + .until(() -> Arrays.equals("msg".getBytes(), msg.get())); + } + @Test public void testSubscribe() { RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson); diff --git a/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java b/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java index 7db6a0b25..adc406f63 100644 --- a/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java +++ b/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java @@ -47,9 +47,47 @@ import reactor.core.publisher.Mono; */ public class RedissonReactiveSubscription implements ReactiveSubscription { + public static class ListenableCounter { + + private int state; + private Runnable r; + + public synchronized void acquire() { + state++; + } + + public void release() { + synchronized (this) { + state--; + if (state != 0) { + return; + } + } + + if (r != null) { + r.run(); + r = null; + } + } + + public synchronized void addListener(Runnable r) { + synchronized (this) { + if (state != 0) { + this.r = r; + return; + } + } + + r.run(); + } + + } + private final Map channels = new ConcurrentHashMap(); private final Map patterns = new ConcurrentHashMap(); - + + private final ListenableCounter monosListener = new ListenableCounter(); + private final PublishSubscribeService subscribeService; public RedissonReactiveSubscription(ConnectionManager connectionManager) { @@ -58,14 +96,21 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { @Override public Mono subscribe(ByteBuffer... channels) { - RedissonPromise result = new RedissonPromise(); - CountableListener listener = new CountableListener(result, null, channels.length); - for (ByteBuffer channel : channels) { - RFuture f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, toChannelName(channel)); - f.onComplete((res, e) -> RedissonReactiveSubscription.this.channels.put(channel, res)); - f.onComplete(listener); - } - return Mono.fromFuture(result); + monosListener.acquire(); + return Mono.defer(() -> { + RedissonPromise result = new RedissonPromise(); + result.onComplete((r, ex) -> { + monosListener.release(); + }); + CountableListener listener = new CountableListener(result, null, channels.length); + for (ByteBuffer channel : channels) { + RFuture f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, toChannelName(channel)); + f.onComplete((res, e) -> RedissonReactiveSubscription.this.channels.put(channel, res)); + f.onComplete(listener); + } + + return Mono.fromFuture(result); + }); } protected ChannelName toChannelName(ByteBuffer channel) { @@ -74,14 +119,20 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { @Override public Mono pSubscribe(ByteBuffer... patterns) { - RedissonPromise result = new RedissonPromise(); - CountableListener listener = new CountableListener(result, null, patterns.length); - for (ByteBuffer channel : patterns) { - RFuture f = subscribeService.psubscribe(toChannelName(channel), ByteArrayCodec.INSTANCE); - f.onComplete((res, e) -> RedissonReactiveSubscription.this.patterns.put(channel, res)); - f.onComplete(listener); - } - return Mono.fromFuture(result); + monosListener.acquire(); + return Mono.defer(() -> { + RedissonPromise result = new RedissonPromise(); + result.onComplete((r, ex) -> { + monosListener.release(); + }); + CountableListener listener = new CountableListener(result, null, patterns.length); + for (ByteBuffer channel : patterns) { + RFuture f = subscribeService.psubscribe(toChannelName(channel), ByteArrayCodec.INSTANCE); + f.onComplete((res, e) -> RedissonReactiveSubscription.this.patterns.put(channel, res)); + f.onComplete(listener); + } + return Mono.fromFuture(result); + }); } @Override @@ -91,13 +142,28 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { @Override public Mono unsubscribe(ByteBuffer... channels) { - RedissonPromise result = new RedissonPromise(); - CountableListener listener = new CountableListener(result, null, channels.length); - for (ByteBuffer channel : channels) { - RFuture f = subscribeService.unsubscribe(toChannelName(channel), PubSubType.UNSUBSCRIBE); - f.onComplete(listener); - } - return Mono.fromFuture(result); + monosListener.acquire(); + return Mono.defer(() -> { + RedissonPromise result = new RedissonPromise(); + result.onComplete((r, ex) -> { + monosListener.release(); + }); + CountableListener listener = new CountableListener(result, null, channels.length); + for (ByteBuffer channel : channels) { + ChannelName cn = toChannelName(channel); + RFuture f = subscribeService.unsubscribe(cn, PubSubType.UNSUBSCRIBE); + f.onComplete((res, e) -> { + synchronized (RedissonReactiveSubscription.this.channels) { + PubSubConnectionEntry entry = RedissonReactiveSubscription.this.channels.get(channel); + if (!entry.hasListeners(cn)) { + RedissonReactiveSubscription.this.channels.remove(channel); + } + } + }); + f.onComplete(listener); + } + return Mono.fromFuture(result); + }); } @Override @@ -107,13 +173,28 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { @Override public Mono pUnsubscribe(ByteBuffer... patterns) { - RedissonPromise result = new RedissonPromise(); - CountableListener listener = new CountableListener(result, null, patterns.length); - for (ByteBuffer channel : patterns) { - RFuture f = subscribeService.unsubscribe(toChannelName(channel), PubSubType.PUNSUBSCRIBE); - f.onComplete(listener); - } - return Mono.fromFuture(result); + monosListener.acquire(); + return Mono.defer(() -> { + RedissonPromise result = new RedissonPromise(); + result.onComplete((r, ex) -> { + monosListener.release(); + }); + CountableListener listener = new CountableListener(result, null, patterns.length); + for (ByteBuffer channel : patterns) { + ChannelName cn = toChannelName(channel); + RFuture f = subscribeService.unsubscribe(cn, PubSubType.PUNSUBSCRIBE); + f.onComplete((res, e) -> { + synchronized (RedissonReactiveSubscription.this.patterns) { + PubSubConnectionEntry entry = RedissonReactiveSubscription.this.patterns.get(channel); + if (!entry.hasListeners(cn)) { + RedissonReactiveSubscription.this.patterns.remove(channel); + } + } + }); + f.onComplete(listener); + } + return Mono.fromFuture(result); + }); } @Override @@ -134,52 +215,54 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { if (flux.get() != null) { return flux.get(); } - + Flux> f = Flux.>create(emitter -> { emitter.onRequest(n -> { - - AtomicLong counter = new AtomicLong(n); - BaseRedisPubSubListener listener = new BaseRedisPubSubListener() { - @Override - public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) { - emitter.next(new PatternMessage<>(ByteBuffer.wrap(pattern.toString().getBytes()), - ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])message))); - - if (counter.decrementAndGet() == 0) { - disposable.dispose(); - emitter.complete(); + + monosListener.addListener(() -> { + AtomicLong counter = new AtomicLong(n); + BaseRedisPubSubListener listener = new BaseRedisPubSubListener() { + @Override + public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) { + emitter.next(new PatternMessage<>(ByteBuffer.wrap(pattern.toString().getBytes()), + ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])message))); + + if (counter.decrementAndGet() == 0) { + disposable.dispose(); + emitter.complete(); + } } - } - - @Override - public void onMessage(CharSequence channel, Object msg) { - emitter.next(new ChannelMessage<>(ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])msg))); - - if (counter.decrementAndGet() == 0) { - disposable.dispose(); - emitter.complete(); + + @Override + public void onMessage(CharSequence channel, Object msg) { + emitter.next(new ChannelMessage<>(ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])msg))); + + if (counter.decrementAndGet() == 0) { + disposable.dispose(); + emitter.complete(); + } } - } - }; + }; + + disposable = () -> { + for (Entry entry : channels.entrySet()) { + entry.getValue().removeListener(toChannelName(entry.getKey()), listener); + } + for (Entry entry : patterns.entrySet()) { + entry.getValue().removeListener(toChannelName(entry.getKey()), listener); + } + }; - disposable = () -> { for (Entry entry : channels.entrySet()) { - entry.getValue().removeListener(toChannelName(entry.getKey()), listener); + entry.getValue().addListener(toChannelName(entry.getKey()), listener); } + for (Entry entry : patterns.entrySet()) { - entry.getValue().removeListener(toChannelName(entry.getKey()), listener); + entry.getValue().addListener(toChannelName(entry.getKey()), listener); } - }; - - for (Entry entry : channels.entrySet()) { - entry.getValue().addListener(toChannelName(entry.getKey()), listener); - } - - for (Entry entry : patterns.entrySet()) { - entry.getValue().addListener(toChannelName(entry.getKey()), listener); - } - - emitter.onDispose(disposable); + + emitter.onDispose(disposable); + }); }); }); diff --git a/redisson-spring-data/redisson-spring-data-22/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeReactiveTest.java b/redisson-spring-data/redisson-spring-data-22/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeReactiveTest.java index f2dc24187..238da0f9e 100644 --- a/redisson-spring-data/redisson-spring-data-22/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeReactiveTest.java +++ b/redisson-spring-data/redisson-spring-data-22/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeReactiveTest.java @@ -10,10 +10,30 @@ import org.junit.Test; import org.springframework.data.redis.connection.ReactiveRedisConnection; import org.springframework.data.redis.connection.ReactiveSubscription; +import org.springframework.data.redis.core.ReactiveStringRedisTemplate; +import org.springframework.data.redis.listener.ChannelTopic; import reactor.core.publisher.Mono; public class RedissonSubscribeReactiveTest extends BaseConnectionTest { + @Test + public void testTemplate() { + RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson); + AtomicReference msg = new AtomicReference(); + + ReactiveStringRedisTemplate template = new ReactiveStringRedisTemplate(factory); + template.listenTo(ChannelTopic.of("test")).flatMap(message -> { + msg.set(message.getMessage().getBytes()); + return template.delete("myobj"); + }).subscribe(); + + ReactiveRedisConnection connection = factory.getReactiveConnection(); + connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block(); + + Awaitility.await().atMost(Duration.ONE_SECOND) + .until(() -> Arrays.equals("msg".getBytes(), msg.get())); + } + @Test public void testSubscribe() { RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);