Fixed - RedissonReactiveSubscription.subscribe and receive methods aren't synchronized. #2686

pull/2694/head
Nikita Koksharov 5 years ago
parent 4eb39a210e
commit b4c358ff90

@ -15,14 +15,6 @@
*/
package org.redisson.spring.data.connection;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.RFuture;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
@ -35,11 +27,19 @@ import org.redisson.misc.RedissonPromise;
import org.redisson.pubsub.PubSubConnectionEntry;
import org.redisson.pubsub.PublishSubscribeService;
import org.springframework.data.redis.connection.ReactiveSubscription;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
*
* @author Nikita Koksharov
@ -83,8 +83,8 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
}
private final Map<ByteBuffer, PubSubConnectionEntry> channels = new ConcurrentHashMap<ByteBuffer, PubSubConnectionEntry>();
private final Map<ByteBuffer, PubSubConnectionEntry> patterns = new ConcurrentHashMap<ByteBuffer, PubSubConnectionEntry>();
private final Map<ChannelName, PubSubConnectionEntry> channels = new ConcurrentHashMap<>();
private final Map<ChannelName, PubSubConnectionEntry> patterns = new ConcurrentHashMap<>();
private final ListenableCounter monosListener = new ListenableCounter();
@ -98,14 +98,15 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
public Mono<Void> subscribe(ByteBuffer... channels) {
monosListener.acquire();
return Mono.defer(() -> {
RedissonPromise<Void> result = new RedissonPromise<Void>();
RedissonPromise<Void> result = new RedissonPromise<>();
result.onComplete((r, ex) -> {
monosListener.release();
});
CountableListener<Void> listener = new CountableListener<Void>(result, null, channels.length);
CountableListener<Void> listener = new CountableListener<>(result, null, channels.length);
for (ByteBuffer channel : channels) {
RFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, toChannelName(channel));
f.onComplete((res, e) -> RedissonReactiveSubscription.this.channels.put(channel, res));
ChannelName cn = toChannelName(channel);
RFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, cn);
f.onComplete((res, e) -> RedissonReactiveSubscription.this.channels.put(cn, res));
f.onComplete(listener);
}
@ -121,14 +122,15 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
public Mono<Void> pSubscribe(ByteBuffer... patterns) {
monosListener.acquire();
return Mono.defer(() -> {
RedissonPromise<Void> result = new RedissonPromise<Void>();
RedissonPromise<Void> result = new RedissonPromise<>();
result.onComplete((r, ex) -> {
monosListener.release();
});
CountableListener<Void> listener = new CountableListener<Void>(result, null, patterns.length);
CountableListener<Void> listener = new CountableListener<>(result, null, patterns.length);
for (ByteBuffer channel : patterns) {
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(toChannelName(channel), ByteArrayCodec.INSTANCE);
f.onComplete((res, e) -> RedissonReactiveSubscription.this.patterns.put(channel, res));
ChannelName cn = toChannelName(channel);
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(cn, ByteArrayCodec.INSTANCE);
f.onComplete((res, e) -> RedissonReactiveSubscription.this.patterns.put(cn, res));
f.onComplete(listener);
}
return Mono.fromFuture(result);
@ -137,26 +139,26 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
@Override
public Mono<Void> unsubscribe() {
return unsubscribe(channels.keySet().toArray(new ByteBuffer[channels.size()]));
return unsubscribe(channels.keySet().stream().map(b -> ByteBuffer.wrap(b.getName())).distinct().toArray(ByteBuffer[]::new));
}
@Override
public Mono<Void> unsubscribe(ByteBuffer... channels) {
monosListener.acquire();
return Mono.defer(() -> {
RedissonPromise<Void> result = new RedissonPromise<Void>();
RedissonPromise<Void> result = new RedissonPromise<>();
result.onComplete((r, ex) -> {
monosListener.release();
});
CountableListener<Void> listener = new CountableListener<Void>(result, null, channels.length);
CountableListener<Void> listener = new CountableListener<>(result, null, channels.length);
for (ByteBuffer channel : channels) {
ChannelName cn = toChannelName(channel);
RFuture<Codec> f = subscribeService.unsubscribe(cn, PubSubType.UNSUBSCRIBE);
f.onComplete((res, e) -> {
synchronized (RedissonReactiveSubscription.this.channels) {
PubSubConnectionEntry entry = RedissonReactiveSubscription.this.channels.get(channel);
PubSubConnectionEntry entry = RedissonReactiveSubscription.this.channels.get(cn);
if (!entry.hasListeners(cn)) {
RedissonReactiveSubscription.this.channels.remove(channel);
RedissonReactiveSubscription.this.channels.remove(cn);
}
}
});
@ -168,26 +170,26 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
@Override
public Mono<Void> pUnsubscribe() {
return unsubscribe(patterns.keySet().toArray(new ByteBuffer[patterns.size()]));
return unsubscribe(patterns.keySet().stream().map(b -> ByteBuffer.wrap(b.getName())).distinct().toArray(ByteBuffer[]::new));
}
@Override
public Mono<Void> pUnsubscribe(ByteBuffer... patterns) {
monosListener.acquire();
return Mono.defer(() -> {
RedissonPromise<Void> result = new RedissonPromise<Void>();
RedissonPromise<Void> result = new RedissonPromise<>();
result.onComplete((r, ex) -> {
monosListener.release();
});
CountableListener<Void> listener = new CountableListener<Void>(result, null, patterns.length);
CountableListener<Void> listener = new CountableListener<>(result, null, patterns.length);
for (ByteBuffer channel : patterns) {
ChannelName cn = toChannelName(channel);
RFuture<Codec> f = subscribeService.unsubscribe(cn, PubSubType.PUNSUBSCRIBE);
f.onComplete((res, e) -> {
synchronized (RedissonReactiveSubscription.this.patterns) {
PubSubConnectionEntry entry = RedissonReactiveSubscription.this.patterns.get(channel);
PubSubConnectionEntry entry = RedissonReactiveSubscription.this.patterns.get(cn);
if (!entry.hasListeners(cn)) {
RedissonReactiveSubscription.this.patterns.remove(channel);
RedissonReactiveSubscription.this.patterns.remove(cn);
}
}
});
@ -199,12 +201,12 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
@Override
public Set<ByteBuffer> getChannels() {
return channels.keySet();
return channels.keySet().stream().map(b -> ByteBuffer.wrap(b.getName())).collect(Collectors.toSet());
}
@Override
public Set<ByteBuffer> getPatterns() {
return patterns.keySet();
return patterns.keySet().stream().map(b -> ByteBuffer.wrap(b.getName())).collect(Collectors.toSet());
}
private final AtomicReference<Flux<Message<ByteBuffer, ByteBuffer>>> flux = new AtomicReference<>();
@ -224,8 +226,13 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
BaseRedisPubSubListener listener = new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) {
if (!patterns.containsKey(new ChannelName(pattern.toString()))) {
return;
}
emitter.next(new PatternMessage<>(ByteBuffer.wrap(pattern.toString().getBytes()),
ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])message)));
ByteBuffer.wrap(channel.toString().getBytes()),
ByteBuffer.wrap((byte[])message)));
if (counter.decrementAndGet() == 0) {
disposable.dispose();
@ -235,6 +242,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
@Override
public void onMessage(CharSequence channel, Object msg) {
if (!channels.containsKey(new ChannelName(channel.toString()))) {
return;
}
emitter.next(new ChannelMessage<>(ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])msg)));
if (counter.decrementAndGet() == 0) {
@ -245,20 +256,19 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
};
disposable = () -> {
for (Entry<ByteBuffer, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().removeListener(toChannelName(entry.getKey()), listener);
for (Entry<ChannelName, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().removeListener(entry.getKey(), listener);
}
for (Entry<ByteBuffer, PubSubConnectionEntry> entry : patterns.entrySet()) {
entry.getValue().removeListener(toChannelName(entry.getKey()), listener);
for (Entry<ChannelName, PubSubConnectionEntry> entry : patterns.entrySet()) {
entry.getValue().removeListener(entry.getKey(), listener);
}
};
for (Entry<ByteBuffer, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().addListener(toChannelName(entry.getKey()), listener);
for (Entry<ChannelName, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().addListener(entry.getKey(), listener);
}
for (Entry<ByteBuffer, PubSubConnectionEntry> entry : patterns.entrySet()) {
entry.getValue().addListener(toChannelName(entry.getKey()), listener);
for (Entry<ChannelName, PubSubConnectionEntry> entry : patterns.entrySet()) {
entry.getValue().addListener(entry.getKey(), listener);
}
emitter.onDispose(disposable);

@ -15,14 +15,6 @@
*/
package org.redisson.spring.data.connection;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.RFuture;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
@ -35,11 +27,19 @@ import org.redisson.misc.RedissonPromise;
import org.redisson.pubsub.PubSubConnectionEntry;
import org.redisson.pubsub.PublishSubscribeService;
import org.springframework.data.redis.connection.ReactiveSubscription;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
*
* @author Nikita Koksharov
@ -83,8 +83,8 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
}
private final Map<ByteBuffer, PubSubConnectionEntry> channels = new ConcurrentHashMap<ByteBuffer, PubSubConnectionEntry>();
private final Map<ByteBuffer, PubSubConnectionEntry> patterns = new ConcurrentHashMap<ByteBuffer, PubSubConnectionEntry>();
private final Map<ChannelName, PubSubConnectionEntry> channels = new ConcurrentHashMap<>();
private final Map<ChannelName, PubSubConnectionEntry> patterns = new ConcurrentHashMap<>();
private final ListenableCounter monosListener = new ListenableCounter();
@ -98,14 +98,15 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
public Mono<Void> subscribe(ByteBuffer... channels) {
monosListener.acquire();
return Mono.defer(() -> {
RedissonPromise<Void> result = new RedissonPromise<Void>();
RedissonPromise<Void> result = new RedissonPromise<>();
result.onComplete((r, ex) -> {
monosListener.release();
});
CountableListener<Void> listener = new CountableListener<Void>(result, null, channels.length);
CountableListener<Void> listener = new CountableListener<>(result, null, channels.length);
for (ByteBuffer channel : channels) {
RFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, toChannelName(channel));
f.onComplete((res, e) -> RedissonReactiveSubscription.this.channels.put(channel, res));
ChannelName cn = toChannelName(channel);
RFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, cn);
f.onComplete((res, e) -> RedissonReactiveSubscription.this.channels.put(cn, res));
f.onComplete(listener);
}
@ -121,14 +122,15 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
public Mono<Void> pSubscribe(ByteBuffer... patterns) {
monosListener.acquire();
return Mono.defer(() -> {
RedissonPromise<Void> result = new RedissonPromise<Void>();
RedissonPromise<Void> result = new RedissonPromise<>();
result.onComplete((r, ex) -> {
monosListener.release();
});
CountableListener<Void> listener = new CountableListener<Void>(result, null, patterns.length);
CountableListener<Void> listener = new CountableListener<>(result, null, patterns.length);
for (ByteBuffer channel : patterns) {
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(toChannelName(channel), ByteArrayCodec.INSTANCE);
f.onComplete((res, e) -> RedissonReactiveSubscription.this.patterns.put(channel, res));
ChannelName cn = toChannelName(channel);
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(cn, ByteArrayCodec.INSTANCE);
f.onComplete((res, e) -> RedissonReactiveSubscription.this.patterns.put(cn, res));
f.onComplete(listener);
}
return Mono.fromFuture(result);
@ -137,26 +139,26 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
@Override
public Mono<Void> unsubscribe() {
return unsubscribe(channels.keySet().toArray(new ByteBuffer[channels.size()]));
return unsubscribe(channels.keySet().stream().map(b -> ByteBuffer.wrap(b.getName())).distinct().toArray(ByteBuffer[]::new));
}
@Override
public Mono<Void> unsubscribe(ByteBuffer... channels) {
monosListener.acquire();
return Mono.defer(() -> {
RedissonPromise<Void> result = new RedissonPromise<Void>();
RedissonPromise<Void> result = new RedissonPromise<>();
result.onComplete((r, ex) -> {
monosListener.release();
});
CountableListener<Void> listener = new CountableListener<Void>(result, null, channels.length);
CountableListener<Void> listener = new CountableListener<>(result, null, channels.length);
for (ByteBuffer channel : channels) {
ChannelName cn = toChannelName(channel);
RFuture<Codec> f = subscribeService.unsubscribe(cn, PubSubType.UNSUBSCRIBE);
f.onComplete((res, e) -> {
synchronized (RedissonReactiveSubscription.this.channels) {
PubSubConnectionEntry entry = RedissonReactiveSubscription.this.channels.get(channel);
PubSubConnectionEntry entry = RedissonReactiveSubscription.this.channels.get(cn);
if (!entry.hasListeners(cn)) {
RedissonReactiveSubscription.this.channels.remove(channel);
RedissonReactiveSubscription.this.channels.remove(cn);
}
}
});
@ -168,26 +170,26 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
@Override
public Mono<Void> pUnsubscribe() {
return unsubscribe(patterns.keySet().toArray(new ByteBuffer[patterns.size()]));
return unsubscribe(patterns.keySet().stream().map(b -> ByteBuffer.wrap(b.getName())).distinct().toArray(ByteBuffer[]::new));
}
@Override
public Mono<Void> pUnsubscribe(ByteBuffer... patterns) {
monosListener.acquire();
return Mono.defer(() -> {
RedissonPromise<Void> result = new RedissonPromise<Void>();
RedissonPromise<Void> result = new RedissonPromise<>();
result.onComplete((r, ex) -> {
monosListener.release();
});
CountableListener<Void> listener = new CountableListener<Void>(result, null, patterns.length);
CountableListener<Void> listener = new CountableListener<>(result, null, patterns.length);
for (ByteBuffer channel : patterns) {
ChannelName cn = toChannelName(channel);
RFuture<Codec> f = subscribeService.unsubscribe(cn, PubSubType.PUNSUBSCRIBE);
f.onComplete((res, e) -> {
synchronized (RedissonReactiveSubscription.this.patterns) {
PubSubConnectionEntry entry = RedissonReactiveSubscription.this.patterns.get(channel);
PubSubConnectionEntry entry = RedissonReactiveSubscription.this.patterns.get(cn);
if (!entry.hasListeners(cn)) {
RedissonReactiveSubscription.this.patterns.remove(channel);
RedissonReactiveSubscription.this.patterns.remove(cn);
}
}
});
@ -199,12 +201,12 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
@Override
public Set<ByteBuffer> getChannels() {
return channels.keySet();
return channels.keySet().stream().map(b -> ByteBuffer.wrap(b.getName())).collect(Collectors.toSet());
}
@Override
public Set<ByteBuffer> getPatterns() {
return patterns.keySet();
return patterns.keySet().stream().map(b -> ByteBuffer.wrap(b.getName())).collect(Collectors.toSet());
}
private final AtomicReference<Flux<Message<ByteBuffer, ByteBuffer>>> flux = new AtomicReference<>();
@ -224,8 +226,13 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
BaseRedisPubSubListener listener = new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) {
if (!patterns.containsKey(new ChannelName(pattern.toString()))) {
return;
}
emitter.next(new PatternMessage<>(ByteBuffer.wrap(pattern.toString().getBytes()),
ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])message)));
ByteBuffer.wrap(channel.toString().getBytes()),
ByteBuffer.wrap((byte[])message)));
if (counter.decrementAndGet() == 0) {
disposable.dispose();
@ -235,6 +242,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
@Override
public void onMessage(CharSequence channel, Object msg) {
if (!channels.containsKey(new ChannelName(channel.toString()))) {
return;
}
emitter.next(new ChannelMessage<>(ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])msg)));
if (counter.decrementAndGet() == 0) {
@ -245,20 +256,19 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
};
disposable = () -> {
for (Entry<ByteBuffer, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().removeListener(toChannelName(entry.getKey()), listener);
for (Entry<ChannelName, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().removeListener(entry.getKey(), listener);
}
for (Entry<ByteBuffer, PubSubConnectionEntry> entry : patterns.entrySet()) {
entry.getValue().removeListener(toChannelName(entry.getKey()), listener);
for (Entry<ChannelName, PubSubConnectionEntry> entry : patterns.entrySet()) {
entry.getValue().removeListener(entry.getKey(), listener);
}
};
for (Entry<ByteBuffer, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().addListener(toChannelName(entry.getKey()), listener);
for (Entry<ChannelName, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().addListener(entry.getKey(), listener);
}
for (Entry<ByteBuffer, PubSubConnectionEntry> entry : patterns.entrySet()) {
entry.getValue().addListener(toChannelName(entry.getKey()), listener);
for (Entry<ChannelName, PubSubConnectionEntry> entry : patterns.entrySet()) {
entry.getValue().addListener(entry.getKey(), listener);
}
emitter.onDispose(disposable);

@ -2,6 +2,7 @@ package org.redisson.spring.data.connection;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.awaitility.Awaitility;
@ -19,19 +20,24 @@ public class RedissonSubscribeReactiveTest extends BaseConnectionTest {
@Test
public void testTemplate() {
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
AtomicReference<byte[]> msg = new AtomicReference<byte[]>();
AtomicLong counter = new AtomicLong();
ReactiveStringRedisTemplate template = new ReactiveStringRedisTemplate(factory);
template.listenTo(ChannelTopic.of("test")).flatMap(message -> {
msg.set(message.getMessage().getBytes());
return template.delete("myobj");
counter.incrementAndGet();
return Mono.empty();
}).subscribe();
template.listenTo(ChannelTopic.of("test2")).flatMap(message -> {
counter.incrementAndGet();
return Mono.empty();
}).subscribe();
ReactiveRedisConnection connection = factory.getReactiveConnection();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Duration.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
.until(() -> counter.get() == 1);
}
@Test

Loading…
Cancel
Save