From b4c358ff908763816865d78a7814ec73688e137f Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Tue, 14 Apr 2020 09:41:40 +0300 Subject: [PATCH] Fixed - RedissonReactiveSubscription.subscribe and receive methods aren't synchronized. #2686 --- .../RedissonReactiveSubscription.java | 92 ++++++++++--------- .../RedissonReactiveSubscription.java | 92 ++++++++++--------- .../RedissonSubscribeReactiveTest.java | 14 ++- 3 files changed, 112 insertions(+), 86 deletions(-) diff --git a/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java b/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java index adc406f63..0ef458be9 100644 --- a/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java +++ b/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java @@ -15,14 +15,6 @@ */ package org.redisson.spring.data.connection; -import java.nio.ByteBuffer; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - import org.redisson.api.RFuture; import org.redisson.client.BaseRedisPubSubListener; import org.redisson.client.ChannelName; @@ -35,11 +27,19 @@ import org.redisson.misc.RedissonPromise; import org.redisson.pubsub.PubSubConnectionEntry; import org.redisson.pubsub.PublishSubscribeService; import org.springframework.data.redis.connection.ReactiveSubscription; - import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + /** * * @author Nikita Koksharov @@ -83,8 +83,8 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { } - private final Map channels = new ConcurrentHashMap(); - private final Map patterns = new ConcurrentHashMap(); + private final Map channels = new ConcurrentHashMap<>(); + private final Map patterns = new ConcurrentHashMap<>(); private final ListenableCounter monosListener = new ListenableCounter(); @@ -98,14 +98,15 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { public Mono subscribe(ByteBuffer... channels) { monosListener.acquire(); return Mono.defer(() -> { - RedissonPromise result = new RedissonPromise(); + RedissonPromise result = new RedissonPromise<>(); result.onComplete((r, ex) -> { monosListener.release(); }); - CountableListener listener = new CountableListener(result, null, channels.length); + CountableListener listener = new CountableListener<>(result, null, channels.length); for (ByteBuffer channel : channels) { - RFuture f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, toChannelName(channel)); - f.onComplete((res, e) -> RedissonReactiveSubscription.this.channels.put(channel, res)); + ChannelName cn = toChannelName(channel); + RFuture f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, cn); + f.onComplete((res, e) -> RedissonReactiveSubscription.this.channels.put(cn, res)); f.onComplete(listener); } @@ -121,14 +122,15 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { public Mono pSubscribe(ByteBuffer... patterns) { monosListener.acquire(); return Mono.defer(() -> { - RedissonPromise result = new RedissonPromise(); + RedissonPromise result = new RedissonPromise<>(); result.onComplete((r, ex) -> { monosListener.release(); }); - CountableListener listener = new CountableListener(result, null, patterns.length); + CountableListener listener = new CountableListener<>(result, null, patterns.length); for (ByteBuffer channel : patterns) { - RFuture f = subscribeService.psubscribe(toChannelName(channel), ByteArrayCodec.INSTANCE); - f.onComplete((res, e) -> RedissonReactiveSubscription.this.patterns.put(channel, res)); + ChannelName cn = toChannelName(channel); + RFuture f = subscribeService.psubscribe(cn, ByteArrayCodec.INSTANCE); + f.onComplete((res, e) -> RedissonReactiveSubscription.this.patterns.put(cn, res)); f.onComplete(listener); } return Mono.fromFuture(result); @@ -137,26 +139,26 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { @Override public Mono unsubscribe() { - return unsubscribe(channels.keySet().toArray(new ByteBuffer[channels.size()])); + return unsubscribe(channels.keySet().stream().map(b -> ByteBuffer.wrap(b.getName())).distinct().toArray(ByteBuffer[]::new)); } @Override public Mono unsubscribe(ByteBuffer... channels) { monosListener.acquire(); return Mono.defer(() -> { - RedissonPromise result = new RedissonPromise(); + RedissonPromise result = new RedissonPromise<>(); result.onComplete((r, ex) -> { monosListener.release(); }); - CountableListener listener = new CountableListener(result, null, channels.length); + CountableListener listener = new CountableListener<>(result, null, channels.length); for (ByteBuffer channel : channels) { ChannelName cn = toChannelName(channel); RFuture f = subscribeService.unsubscribe(cn, PubSubType.UNSUBSCRIBE); f.onComplete((res, e) -> { synchronized (RedissonReactiveSubscription.this.channels) { - PubSubConnectionEntry entry = RedissonReactiveSubscription.this.channels.get(channel); + PubSubConnectionEntry entry = RedissonReactiveSubscription.this.channels.get(cn); if (!entry.hasListeners(cn)) { - RedissonReactiveSubscription.this.channels.remove(channel); + RedissonReactiveSubscription.this.channels.remove(cn); } } }); @@ -168,26 +170,26 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { @Override public Mono pUnsubscribe() { - return unsubscribe(patterns.keySet().toArray(new ByteBuffer[patterns.size()])); + return unsubscribe(patterns.keySet().stream().map(b -> ByteBuffer.wrap(b.getName())).distinct().toArray(ByteBuffer[]::new)); } @Override public Mono pUnsubscribe(ByteBuffer... patterns) { monosListener.acquire(); return Mono.defer(() -> { - RedissonPromise result = new RedissonPromise(); + RedissonPromise result = new RedissonPromise<>(); result.onComplete((r, ex) -> { monosListener.release(); }); - CountableListener listener = new CountableListener(result, null, patterns.length); + CountableListener listener = new CountableListener<>(result, null, patterns.length); for (ByteBuffer channel : patterns) { ChannelName cn = toChannelName(channel); RFuture f = subscribeService.unsubscribe(cn, PubSubType.PUNSUBSCRIBE); f.onComplete((res, e) -> { synchronized (RedissonReactiveSubscription.this.patterns) { - PubSubConnectionEntry entry = RedissonReactiveSubscription.this.patterns.get(channel); + PubSubConnectionEntry entry = RedissonReactiveSubscription.this.patterns.get(cn); if (!entry.hasListeners(cn)) { - RedissonReactiveSubscription.this.patterns.remove(channel); + RedissonReactiveSubscription.this.patterns.remove(cn); } } }); @@ -199,12 +201,12 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { @Override public Set getChannels() { - return channels.keySet(); + return channels.keySet().stream().map(b -> ByteBuffer.wrap(b.getName())).collect(Collectors.toSet()); } @Override public Set getPatterns() { - return patterns.keySet(); + return patterns.keySet().stream().map(b -> ByteBuffer.wrap(b.getName())).collect(Collectors.toSet()); } private final AtomicReference>> flux = new AtomicReference<>(); @@ -224,8 +226,13 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { BaseRedisPubSubListener listener = new BaseRedisPubSubListener() { @Override public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) { + if (!patterns.containsKey(new ChannelName(pattern.toString()))) { + return; + } + emitter.next(new PatternMessage<>(ByteBuffer.wrap(pattern.toString().getBytes()), - ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])message))); + ByteBuffer.wrap(channel.toString().getBytes()), + ByteBuffer.wrap((byte[])message))); if (counter.decrementAndGet() == 0) { disposable.dispose(); @@ -235,6 +242,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { @Override public void onMessage(CharSequence channel, Object msg) { + if (!channels.containsKey(new ChannelName(channel.toString()))) { + return; + } + emitter.next(new ChannelMessage<>(ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])msg))); if (counter.decrementAndGet() == 0) { @@ -245,20 +256,19 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { }; disposable = () -> { - for (Entry entry : channels.entrySet()) { - entry.getValue().removeListener(toChannelName(entry.getKey()), listener); + for (Entry entry : channels.entrySet()) { + entry.getValue().removeListener(entry.getKey(), listener); } - for (Entry entry : patterns.entrySet()) { - entry.getValue().removeListener(toChannelName(entry.getKey()), listener); + for (Entry entry : patterns.entrySet()) { + entry.getValue().removeListener(entry.getKey(), listener); } }; - for (Entry entry : channels.entrySet()) { - entry.getValue().addListener(toChannelName(entry.getKey()), listener); + for (Entry entry : channels.entrySet()) { + entry.getValue().addListener(entry.getKey(), listener); } - - for (Entry entry : patterns.entrySet()) { - entry.getValue().addListener(toChannelName(entry.getKey()), listener); + for (Entry entry : patterns.entrySet()) { + entry.getValue().addListener(entry.getKey(), listener); } emitter.onDispose(disposable); diff --git a/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java b/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java index adc406f63..0ef458be9 100644 --- a/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java +++ b/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java @@ -15,14 +15,6 @@ */ package org.redisson.spring.data.connection; -import java.nio.ByteBuffer; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - import org.redisson.api.RFuture; import org.redisson.client.BaseRedisPubSubListener; import org.redisson.client.ChannelName; @@ -35,11 +27,19 @@ import org.redisson.misc.RedissonPromise; import org.redisson.pubsub.PubSubConnectionEntry; import org.redisson.pubsub.PublishSubscribeService; import org.springframework.data.redis.connection.ReactiveSubscription; - import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + /** * * @author Nikita Koksharov @@ -83,8 +83,8 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { } - private final Map channels = new ConcurrentHashMap(); - private final Map patterns = new ConcurrentHashMap(); + private final Map channels = new ConcurrentHashMap<>(); + private final Map patterns = new ConcurrentHashMap<>(); private final ListenableCounter monosListener = new ListenableCounter(); @@ -98,14 +98,15 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { public Mono subscribe(ByteBuffer... channels) { monosListener.acquire(); return Mono.defer(() -> { - RedissonPromise result = new RedissonPromise(); + RedissonPromise result = new RedissonPromise<>(); result.onComplete((r, ex) -> { monosListener.release(); }); - CountableListener listener = new CountableListener(result, null, channels.length); + CountableListener listener = new CountableListener<>(result, null, channels.length); for (ByteBuffer channel : channels) { - RFuture f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, toChannelName(channel)); - f.onComplete((res, e) -> RedissonReactiveSubscription.this.channels.put(channel, res)); + ChannelName cn = toChannelName(channel); + RFuture f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, cn); + f.onComplete((res, e) -> RedissonReactiveSubscription.this.channels.put(cn, res)); f.onComplete(listener); } @@ -121,14 +122,15 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { public Mono pSubscribe(ByteBuffer... patterns) { monosListener.acquire(); return Mono.defer(() -> { - RedissonPromise result = new RedissonPromise(); + RedissonPromise result = new RedissonPromise<>(); result.onComplete((r, ex) -> { monosListener.release(); }); - CountableListener listener = new CountableListener(result, null, patterns.length); + CountableListener listener = new CountableListener<>(result, null, patterns.length); for (ByteBuffer channel : patterns) { - RFuture f = subscribeService.psubscribe(toChannelName(channel), ByteArrayCodec.INSTANCE); - f.onComplete((res, e) -> RedissonReactiveSubscription.this.patterns.put(channel, res)); + ChannelName cn = toChannelName(channel); + RFuture f = subscribeService.psubscribe(cn, ByteArrayCodec.INSTANCE); + f.onComplete((res, e) -> RedissonReactiveSubscription.this.patterns.put(cn, res)); f.onComplete(listener); } return Mono.fromFuture(result); @@ -137,26 +139,26 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { @Override public Mono unsubscribe() { - return unsubscribe(channels.keySet().toArray(new ByteBuffer[channels.size()])); + return unsubscribe(channels.keySet().stream().map(b -> ByteBuffer.wrap(b.getName())).distinct().toArray(ByteBuffer[]::new)); } @Override public Mono unsubscribe(ByteBuffer... channels) { monosListener.acquire(); return Mono.defer(() -> { - RedissonPromise result = new RedissonPromise(); + RedissonPromise result = new RedissonPromise<>(); result.onComplete((r, ex) -> { monosListener.release(); }); - CountableListener listener = new CountableListener(result, null, channels.length); + CountableListener listener = new CountableListener<>(result, null, channels.length); for (ByteBuffer channel : channels) { ChannelName cn = toChannelName(channel); RFuture f = subscribeService.unsubscribe(cn, PubSubType.UNSUBSCRIBE); f.onComplete((res, e) -> { synchronized (RedissonReactiveSubscription.this.channels) { - PubSubConnectionEntry entry = RedissonReactiveSubscription.this.channels.get(channel); + PubSubConnectionEntry entry = RedissonReactiveSubscription.this.channels.get(cn); if (!entry.hasListeners(cn)) { - RedissonReactiveSubscription.this.channels.remove(channel); + RedissonReactiveSubscription.this.channels.remove(cn); } } }); @@ -168,26 +170,26 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { @Override public Mono pUnsubscribe() { - return unsubscribe(patterns.keySet().toArray(new ByteBuffer[patterns.size()])); + return unsubscribe(patterns.keySet().stream().map(b -> ByteBuffer.wrap(b.getName())).distinct().toArray(ByteBuffer[]::new)); } @Override public Mono pUnsubscribe(ByteBuffer... patterns) { monosListener.acquire(); return Mono.defer(() -> { - RedissonPromise result = new RedissonPromise(); + RedissonPromise result = new RedissonPromise<>(); result.onComplete((r, ex) -> { monosListener.release(); }); - CountableListener listener = new CountableListener(result, null, patterns.length); + CountableListener listener = new CountableListener<>(result, null, patterns.length); for (ByteBuffer channel : patterns) { ChannelName cn = toChannelName(channel); RFuture f = subscribeService.unsubscribe(cn, PubSubType.PUNSUBSCRIBE); f.onComplete((res, e) -> { synchronized (RedissonReactiveSubscription.this.patterns) { - PubSubConnectionEntry entry = RedissonReactiveSubscription.this.patterns.get(channel); + PubSubConnectionEntry entry = RedissonReactiveSubscription.this.patterns.get(cn); if (!entry.hasListeners(cn)) { - RedissonReactiveSubscription.this.patterns.remove(channel); + RedissonReactiveSubscription.this.patterns.remove(cn); } } }); @@ -199,12 +201,12 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { @Override public Set getChannels() { - return channels.keySet(); + return channels.keySet().stream().map(b -> ByteBuffer.wrap(b.getName())).collect(Collectors.toSet()); } @Override public Set getPatterns() { - return patterns.keySet(); + return patterns.keySet().stream().map(b -> ByteBuffer.wrap(b.getName())).collect(Collectors.toSet()); } private final AtomicReference>> flux = new AtomicReference<>(); @@ -224,8 +226,13 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { BaseRedisPubSubListener listener = new BaseRedisPubSubListener() { @Override public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) { + if (!patterns.containsKey(new ChannelName(pattern.toString()))) { + return; + } + emitter.next(new PatternMessage<>(ByteBuffer.wrap(pattern.toString().getBytes()), - ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])message))); + ByteBuffer.wrap(channel.toString().getBytes()), + ByteBuffer.wrap((byte[])message))); if (counter.decrementAndGet() == 0) { disposable.dispose(); @@ -235,6 +242,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { @Override public void onMessage(CharSequence channel, Object msg) { + if (!channels.containsKey(new ChannelName(channel.toString()))) { + return; + } + emitter.next(new ChannelMessage<>(ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])msg))); if (counter.decrementAndGet() == 0) { @@ -245,20 +256,19 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { }; disposable = () -> { - for (Entry entry : channels.entrySet()) { - entry.getValue().removeListener(toChannelName(entry.getKey()), listener); + for (Entry entry : channels.entrySet()) { + entry.getValue().removeListener(entry.getKey(), listener); } - for (Entry entry : patterns.entrySet()) { - entry.getValue().removeListener(toChannelName(entry.getKey()), listener); + for (Entry entry : patterns.entrySet()) { + entry.getValue().removeListener(entry.getKey(), listener); } }; - for (Entry entry : channels.entrySet()) { - entry.getValue().addListener(toChannelName(entry.getKey()), listener); + for (Entry entry : channels.entrySet()) { + entry.getValue().addListener(entry.getKey(), listener); } - - for (Entry entry : patterns.entrySet()) { - entry.getValue().addListener(toChannelName(entry.getKey()), listener); + for (Entry entry : patterns.entrySet()) { + entry.getValue().addListener(entry.getKey(), listener); } emitter.onDispose(disposable); diff --git a/redisson-spring-data/redisson-spring-data-22/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeReactiveTest.java b/redisson-spring-data/redisson-spring-data-22/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeReactiveTest.java index 238da0f9e..58b8b5f41 100644 --- a/redisson-spring-data/redisson-spring-data-22/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeReactiveTest.java +++ b/redisson-spring-data/redisson-spring-data-22/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeReactiveTest.java @@ -2,6 +2,7 @@ package org.redisson.spring.data.connection; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import org.awaitility.Awaitility; @@ -19,19 +20,24 @@ public class RedissonSubscribeReactiveTest extends BaseConnectionTest { @Test public void testTemplate() { RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson); - AtomicReference msg = new AtomicReference(); + AtomicLong counter = new AtomicLong(); ReactiveStringRedisTemplate template = new ReactiveStringRedisTemplate(factory); template.listenTo(ChannelTopic.of("test")).flatMap(message -> { - msg.set(message.getMessage().getBytes()); - return template.delete("myobj"); + counter.incrementAndGet(); + return Mono.empty(); + }).subscribe(); + + template.listenTo(ChannelTopic.of("test2")).flatMap(message -> { + counter.incrementAndGet(); + return Mono.empty(); }).subscribe(); ReactiveRedisConnection connection = factory.getReactiveConnection(); connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block(); Awaitility.await().atMost(Duration.ONE_SECOND) - .until(() -> Arrays.equals("msg".getBytes(), msg.get())); + .until(() -> counter.get() == 1); } @Test