Fixed - RedissonReactiveSubscription.subscribe and receive methods aren't synchronized. #2686

pull/2691/head
Nikita Koksharov 5 years ago
parent 88172a2a24
commit 15c464013d

@ -47,9 +47,47 @@ import reactor.core.publisher.Mono;
*/
public class RedissonReactiveSubscription implements ReactiveSubscription {
public static class ListenableCounter {
private int state;
private Runnable r;
public synchronized void acquire() {
state++;
}
public void release() {
synchronized (this) {
state--;
if (state != 0) {
return;
}
}
if (r != null) {
r.run();
r = null;
}
}
public synchronized void addListener(Runnable r) {
synchronized (this) {
if (state != 0) {
this.r = r;
return;
}
}
r.run();
}
}
private final Map<ByteBuffer, PubSubConnectionEntry> channels = new ConcurrentHashMap<ByteBuffer, PubSubConnectionEntry>();
private final Map<ByteBuffer, PubSubConnectionEntry> patterns = new ConcurrentHashMap<ByteBuffer, PubSubConnectionEntry>();
private final ListenableCounter monosListener = new ListenableCounter();
private final PublishSubscribeService subscribeService;
public RedissonReactiveSubscription(ConnectionManager connectionManager) {
@ -58,14 +96,21 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
@Override
public Mono<Void> subscribe(ByteBuffer... channels) {
RedissonPromise<Void> result = new RedissonPromise<Void>();
CountableListener<Void> listener = new CountableListener<Void>(result, null, channels.length);
for (ByteBuffer channel : channels) {
RFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, toChannelName(channel));
f.onComplete((res, e) -> RedissonReactiveSubscription.this.channels.put(channel, res));
f.onComplete(listener);
}
return Mono.fromFuture(result);
monosListener.acquire();
return Mono.defer(() -> {
RedissonPromise<Void> result = new RedissonPromise<Void>();
result.onComplete((r, ex) -> {
monosListener.release();
});
CountableListener<Void> listener = new CountableListener<Void>(result, null, channels.length);
for (ByteBuffer channel : channels) {
RFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, toChannelName(channel));
f.onComplete((res, e) -> RedissonReactiveSubscription.this.channels.put(channel, res));
f.onComplete(listener);
}
return Mono.fromFuture(result);
});
}
protected ChannelName toChannelName(ByteBuffer channel) {
@ -74,14 +119,20 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
@Override
public Mono<Void> pSubscribe(ByteBuffer... patterns) {
RedissonPromise<Void> result = new RedissonPromise<Void>();
CountableListener<Void> listener = new CountableListener<Void>(result, null, patterns.length);
for (ByteBuffer channel : patterns) {
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(toChannelName(channel), ByteArrayCodec.INSTANCE);
f.onComplete((res, e) -> RedissonReactiveSubscription.this.patterns.put(channel, res));
f.onComplete(listener);
}
return Mono.fromFuture(result);
monosListener.acquire();
return Mono.defer(() -> {
RedissonPromise<Void> result = new RedissonPromise<Void>();
result.onComplete((r, ex) -> {
monosListener.release();
});
CountableListener<Void> listener = new CountableListener<Void>(result, null, patterns.length);
for (ByteBuffer channel : patterns) {
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(toChannelName(channel), ByteArrayCodec.INSTANCE);
f.onComplete((res, e) -> RedissonReactiveSubscription.this.patterns.put(channel, res));
f.onComplete(listener);
}
return Mono.fromFuture(result);
});
}
@Override
@ -91,13 +142,28 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
@Override
public Mono<Void> unsubscribe(ByteBuffer... channels) {
RedissonPromise<Void> result = new RedissonPromise<Void>();
CountableListener<Void> listener = new CountableListener<Void>(result, null, channels.length);
for (ByteBuffer channel : channels) {
RFuture<Codec> f = subscribeService.unsubscribe(toChannelName(channel), PubSubType.UNSUBSCRIBE);
f.onComplete(listener);
}
return Mono.fromFuture(result);
monosListener.acquire();
return Mono.defer(() -> {
RedissonPromise<Void> result = new RedissonPromise<Void>();
result.onComplete((r, ex) -> {
monosListener.release();
});
CountableListener<Void> listener = new CountableListener<Void>(result, null, channels.length);
for (ByteBuffer channel : channels) {
ChannelName cn = toChannelName(channel);
RFuture<Codec> f = subscribeService.unsubscribe(cn, PubSubType.UNSUBSCRIBE);
f.onComplete((res, e) -> {
synchronized (RedissonReactiveSubscription.this.channels) {
PubSubConnectionEntry entry = RedissonReactiveSubscription.this.channels.get(channel);
if (!entry.hasListeners(cn)) {
RedissonReactiveSubscription.this.channels.remove(channel);
}
}
});
f.onComplete(listener);
}
return Mono.fromFuture(result);
});
}
@Override
@ -107,13 +173,28 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
@Override
public Mono<Void> pUnsubscribe(ByteBuffer... patterns) {
RedissonPromise<Void> result = new RedissonPromise<Void>();
CountableListener<Void> listener = new CountableListener<Void>(result, null, patterns.length);
for (ByteBuffer channel : patterns) {
RFuture<Codec> f = subscribeService.unsubscribe(toChannelName(channel), PubSubType.PUNSUBSCRIBE);
f.onComplete(listener);
}
return Mono.fromFuture(result);
monosListener.acquire();
return Mono.defer(() -> {
RedissonPromise<Void> result = new RedissonPromise<Void>();
result.onComplete((r, ex) -> {
monosListener.release();
});
CountableListener<Void> listener = new CountableListener<Void>(result, null, patterns.length);
for (ByteBuffer channel : patterns) {
ChannelName cn = toChannelName(channel);
RFuture<Codec> f = subscribeService.unsubscribe(cn, PubSubType.PUNSUBSCRIBE);
f.onComplete((res, e) -> {
synchronized (RedissonReactiveSubscription.this.patterns) {
PubSubConnectionEntry entry = RedissonReactiveSubscription.this.patterns.get(channel);
if (!entry.hasListeners(cn)) {
RedissonReactiveSubscription.this.patterns.remove(channel);
}
}
});
f.onComplete(listener);
}
return Mono.fromFuture(result);
});
}
@Override
@ -134,52 +215,54 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
if (flux.get() != null) {
return flux.get();
}
Flux<Message<ByteBuffer, ByteBuffer>> f = Flux.<Message<ByteBuffer, ByteBuffer>>create(emitter -> {
emitter.onRequest(n -> {
AtomicLong counter = new AtomicLong(n);
BaseRedisPubSubListener listener = new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) {
emitter.next(new PatternMessage<>(ByteBuffer.wrap(pattern.toString().getBytes()),
ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])message)));
if (counter.decrementAndGet() == 0) {
disposable.dispose();
emitter.complete();
monosListener.addListener(() -> {
AtomicLong counter = new AtomicLong(n);
BaseRedisPubSubListener listener = new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) {
emitter.next(new PatternMessage<>(ByteBuffer.wrap(pattern.toString().getBytes()),
ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])message)));
if (counter.decrementAndGet() == 0) {
disposable.dispose();
emitter.complete();
}
}
}
@Override
public void onMessage(CharSequence channel, Object msg) {
emitter.next(new ChannelMessage<>(ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])msg)));
if (counter.decrementAndGet() == 0) {
disposable.dispose();
emitter.complete();
@Override
public void onMessage(CharSequence channel, Object msg) {
emitter.next(new ChannelMessage<>(ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])msg)));
if (counter.decrementAndGet() == 0) {
disposable.dispose();
emitter.complete();
}
}
}
};
};
disposable = () -> {
for (Entry<ByteBuffer, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().removeListener(toChannelName(entry.getKey()), listener);
}
for (Entry<ByteBuffer, PubSubConnectionEntry> entry : patterns.entrySet()) {
entry.getValue().removeListener(toChannelName(entry.getKey()), listener);
}
};
disposable = () -> {
for (Entry<ByteBuffer, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().removeListener(toChannelName(entry.getKey()), listener);
entry.getValue().addListener(toChannelName(entry.getKey()), listener);
}
for (Entry<ByteBuffer, PubSubConnectionEntry> entry : patterns.entrySet()) {
entry.getValue().removeListener(toChannelName(entry.getKey()), listener);
entry.getValue().addListener(toChannelName(entry.getKey()), listener);
}
};
for (Entry<ByteBuffer, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().addListener(toChannelName(entry.getKey()), listener);
}
for (Entry<ByteBuffer, PubSubConnectionEntry> entry : patterns.entrySet()) {
entry.getValue().addListener(toChannelName(entry.getKey()), listener);
}
emitter.onDispose(disposable);
emitter.onDispose(disposable);
});
});
});

@ -10,10 +10,30 @@ import org.junit.Test;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveSubscription;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import reactor.core.publisher.Mono;
public class RedissonSubscribeReactiveTest extends BaseConnectionTest {
@Test
public void testTemplate() {
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
AtomicReference<byte[]> msg = new AtomicReference<byte[]>();
ReactiveStringRedisTemplate template = new ReactiveStringRedisTemplate(factory);
template.listenTo(ChannelTopic.of("test")).flatMap(message -> {
msg.set(message.getMessage().getBytes());
return template.delete("myobj");
}).subscribe();
ReactiveRedisConnection connection = factory.getReactiveConnection();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Duration.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
}
@Test
public void testSubscribe() {
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);

@ -47,9 +47,47 @@ import reactor.core.publisher.Mono;
*/
public class RedissonReactiveSubscription implements ReactiveSubscription {
public static class ListenableCounter {
private int state;
private Runnable r;
public synchronized void acquire() {
state++;
}
public void release() {
synchronized (this) {
state--;
if (state != 0) {
return;
}
}
if (r != null) {
r.run();
r = null;
}
}
public synchronized void addListener(Runnable r) {
synchronized (this) {
if (state != 0) {
this.r = r;
return;
}
}
r.run();
}
}
private final Map<ByteBuffer, PubSubConnectionEntry> channels = new ConcurrentHashMap<ByteBuffer, PubSubConnectionEntry>();
private final Map<ByteBuffer, PubSubConnectionEntry> patterns = new ConcurrentHashMap<ByteBuffer, PubSubConnectionEntry>();
private final ListenableCounter monosListener = new ListenableCounter();
private final PublishSubscribeService subscribeService;
public RedissonReactiveSubscription(ConnectionManager connectionManager) {
@ -58,14 +96,21 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
@Override
public Mono<Void> subscribe(ByteBuffer... channels) {
RedissonPromise<Void> result = new RedissonPromise<Void>();
CountableListener<Void> listener = new CountableListener<Void>(result, null, channels.length);
for (ByteBuffer channel : channels) {
RFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, toChannelName(channel));
f.onComplete((res, e) -> RedissonReactiveSubscription.this.channels.put(channel, res));
f.onComplete(listener);
}
return Mono.fromFuture(result);
monosListener.acquire();
return Mono.defer(() -> {
RedissonPromise<Void> result = new RedissonPromise<Void>();
result.onComplete((r, ex) -> {
monosListener.release();
});
CountableListener<Void> listener = new CountableListener<Void>(result, null, channels.length);
for (ByteBuffer channel : channels) {
RFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, toChannelName(channel));
f.onComplete((res, e) -> RedissonReactiveSubscription.this.channels.put(channel, res));
f.onComplete(listener);
}
return Mono.fromFuture(result);
});
}
protected ChannelName toChannelName(ByteBuffer channel) {
@ -74,14 +119,20 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
@Override
public Mono<Void> pSubscribe(ByteBuffer... patterns) {
RedissonPromise<Void> result = new RedissonPromise<Void>();
CountableListener<Void> listener = new CountableListener<Void>(result, null, patterns.length);
for (ByteBuffer channel : patterns) {
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(toChannelName(channel), ByteArrayCodec.INSTANCE);
f.onComplete((res, e) -> RedissonReactiveSubscription.this.patterns.put(channel, res));
f.onComplete(listener);
}
return Mono.fromFuture(result);
monosListener.acquire();
return Mono.defer(() -> {
RedissonPromise<Void> result = new RedissonPromise<Void>();
result.onComplete((r, ex) -> {
monosListener.release();
});
CountableListener<Void> listener = new CountableListener<Void>(result, null, patterns.length);
for (ByteBuffer channel : patterns) {
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(toChannelName(channel), ByteArrayCodec.INSTANCE);
f.onComplete((res, e) -> RedissonReactiveSubscription.this.patterns.put(channel, res));
f.onComplete(listener);
}
return Mono.fromFuture(result);
});
}
@Override
@ -91,13 +142,28 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
@Override
public Mono<Void> unsubscribe(ByteBuffer... channels) {
RedissonPromise<Void> result = new RedissonPromise<Void>();
CountableListener<Void> listener = new CountableListener<Void>(result, null, channels.length);
for (ByteBuffer channel : channels) {
RFuture<Codec> f = subscribeService.unsubscribe(toChannelName(channel), PubSubType.UNSUBSCRIBE);
f.onComplete(listener);
}
return Mono.fromFuture(result);
monosListener.acquire();
return Mono.defer(() -> {
RedissonPromise<Void> result = new RedissonPromise<Void>();
result.onComplete((r, ex) -> {
monosListener.release();
});
CountableListener<Void> listener = new CountableListener<Void>(result, null, channels.length);
for (ByteBuffer channel : channels) {
ChannelName cn = toChannelName(channel);
RFuture<Codec> f = subscribeService.unsubscribe(cn, PubSubType.UNSUBSCRIBE);
f.onComplete((res, e) -> {
synchronized (RedissonReactiveSubscription.this.channels) {
PubSubConnectionEntry entry = RedissonReactiveSubscription.this.channels.get(channel);
if (!entry.hasListeners(cn)) {
RedissonReactiveSubscription.this.channels.remove(channel);
}
}
});
f.onComplete(listener);
}
return Mono.fromFuture(result);
});
}
@Override
@ -107,13 +173,28 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
@Override
public Mono<Void> pUnsubscribe(ByteBuffer... patterns) {
RedissonPromise<Void> result = new RedissonPromise<Void>();
CountableListener<Void> listener = new CountableListener<Void>(result, null, patterns.length);
for (ByteBuffer channel : patterns) {
RFuture<Codec> f = subscribeService.unsubscribe(toChannelName(channel), PubSubType.PUNSUBSCRIBE);
f.onComplete(listener);
}
return Mono.fromFuture(result);
monosListener.acquire();
return Mono.defer(() -> {
RedissonPromise<Void> result = new RedissonPromise<Void>();
result.onComplete((r, ex) -> {
monosListener.release();
});
CountableListener<Void> listener = new CountableListener<Void>(result, null, patterns.length);
for (ByteBuffer channel : patterns) {
ChannelName cn = toChannelName(channel);
RFuture<Codec> f = subscribeService.unsubscribe(cn, PubSubType.PUNSUBSCRIBE);
f.onComplete((res, e) -> {
synchronized (RedissonReactiveSubscription.this.patterns) {
PubSubConnectionEntry entry = RedissonReactiveSubscription.this.patterns.get(channel);
if (!entry.hasListeners(cn)) {
RedissonReactiveSubscription.this.patterns.remove(channel);
}
}
});
f.onComplete(listener);
}
return Mono.fromFuture(result);
});
}
@Override
@ -134,52 +215,54 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
if (flux.get() != null) {
return flux.get();
}
Flux<Message<ByteBuffer, ByteBuffer>> f = Flux.<Message<ByteBuffer, ByteBuffer>>create(emitter -> {
emitter.onRequest(n -> {
AtomicLong counter = new AtomicLong(n);
BaseRedisPubSubListener listener = new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) {
emitter.next(new PatternMessage<>(ByteBuffer.wrap(pattern.toString().getBytes()),
ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])message)));
if (counter.decrementAndGet() == 0) {
disposable.dispose();
emitter.complete();
monosListener.addListener(() -> {
AtomicLong counter = new AtomicLong(n);
BaseRedisPubSubListener listener = new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) {
emitter.next(new PatternMessage<>(ByteBuffer.wrap(pattern.toString().getBytes()),
ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])message)));
if (counter.decrementAndGet() == 0) {
disposable.dispose();
emitter.complete();
}
}
}
@Override
public void onMessage(CharSequence channel, Object msg) {
emitter.next(new ChannelMessage<>(ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])msg)));
if (counter.decrementAndGet() == 0) {
disposable.dispose();
emitter.complete();
@Override
public void onMessage(CharSequence channel, Object msg) {
emitter.next(new ChannelMessage<>(ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])msg)));
if (counter.decrementAndGet() == 0) {
disposable.dispose();
emitter.complete();
}
}
}
};
};
disposable = () -> {
for (Entry<ByteBuffer, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().removeListener(toChannelName(entry.getKey()), listener);
}
for (Entry<ByteBuffer, PubSubConnectionEntry> entry : patterns.entrySet()) {
entry.getValue().removeListener(toChannelName(entry.getKey()), listener);
}
};
disposable = () -> {
for (Entry<ByteBuffer, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().removeListener(toChannelName(entry.getKey()), listener);
entry.getValue().addListener(toChannelName(entry.getKey()), listener);
}
for (Entry<ByteBuffer, PubSubConnectionEntry> entry : patterns.entrySet()) {
entry.getValue().removeListener(toChannelName(entry.getKey()), listener);
entry.getValue().addListener(toChannelName(entry.getKey()), listener);
}
};
for (Entry<ByteBuffer, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().addListener(toChannelName(entry.getKey()), listener);
}
for (Entry<ByteBuffer, PubSubConnectionEntry> entry : patterns.entrySet()) {
entry.getValue().addListener(toChannelName(entry.getKey()), listener);
}
emitter.onDispose(disposable);
emitter.onDispose(disposable);
});
});
});

@ -10,10 +10,30 @@ import org.junit.Test;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveSubscription;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import reactor.core.publisher.Mono;
public class RedissonSubscribeReactiveTest extends BaseConnectionTest {
@Test
public void testTemplate() {
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
AtomicReference<byte[]> msg = new AtomicReference<byte[]>();
ReactiveStringRedisTemplate template = new ReactiveStringRedisTemplate(factory);
template.listenTo(ChannelTopic.of("test")).flatMap(message -> {
msg.set(message.getMessage().getBytes());
return template.delete("myobj");
}).subscribe();
ReactiveRedisConnection connection = factory.getReactiveConnection();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Duration.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
}
@Test
public void testSubscribe() {
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);

Loading…
Cancel
Save