Fixed - RedissonReactiveSubscription removes listener after first 32 messages. #3667

pull/3676/head
Nikita Koksharov 4 years ago
parent db78bc9da4
commit 462eacb707

@ -37,7 +37,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@ -223,7 +222,6 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
emitter.onRequest(n -> {
monosListener.addListener(() -> {
AtomicLong counter = new AtomicLong(n);
BaseRedisPubSubListener listener = new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) {
@ -234,11 +232,6 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
emitter.next(new PatternMessage<>(ByteBuffer.wrap(pattern.toString().getBytes()),
ByteBuffer.wrap(channel.toString().getBytes()),
ByteBuffer.wrap((byte[])message)));
if (counter.decrementAndGet() == 0) {
disposable.dispose();
emitter.complete();
}
}
@Override
@ -248,11 +241,6 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
}
emitter.next(new ChannelMessage<>(ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])msg)));
if (counter.decrementAndGet() == 0) {
disposable.dispose();
emitter.complete();
}
}
};

@ -37,7 +37,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@ -223,7 +222,6 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
emitter.onRequest(n -> {
monosListener.addListener(() -> {
AtomicLong counter = new AtomicLong(n);
BaseRedisPubSubListener listener = new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) {
@ -234,11 +232,6 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
emitter.next(new PatternMessage<>(ByteBuffer.wrap(pattern.toString().getBytes()),
ByteBuffer.wrap(channel.toString().getBytes()),
ByteBuffer.wrap((byte[])message)));
if (counter.decrementAndGet() == 0) {
disposable.dispose();
emitter.complete();
}
}
@Override
@ -248,11 +241,6 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
}
emitter.next(new ChannelMessage<>(ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])msg)));
if (counter.decrementAndGet() == 0) {
disposable.dispose();
emitter.complete();
}
}
};

@ -37,7 +37,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@ -223,7 +222,6 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
emitter.onRequest(n -> {
monosListener.addListener(() -> {
AtomicLong counter = new AtomicLong(n);
BaseRedisPubSubListener listener = new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) {
@ -234,11 +232,6 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
emitter.next(new PatternMessage<>(ByteBuffer.wrap(pattern.toString().getBytes()),
ByteBuffer.wrap(channel.toString().getBytes()),
ByteBuffer.wrap((byte[])message)));
if (counter.decrementAndGet() == 0) {
disposable.dispose();
emitter.complete();
}
}
@Override
@ -248,11 +241,6 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
}
emitter.next(new ChannelMessage<>(ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])msg)));
if (counter.decrementAndGet() == 0) {
disposable.dispose();
emitter.complete();
}
}
};

@ -37,7 +37,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@ -223,7 +222,6 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
emitter.onRequest(n -> {
monosListener.addListener(() -> {
AtomicLong counter = new AtomicLong(n);
BaseRedisPubSubListener listener = new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) {
@ -234,11 +232,6 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
emitter.next(new PatternMessage<>(ByteBuffer.wrap(pattern.toString().getBytes()),
ByteBuffer.wrap(channel.toString().getBytes()),
ByteBuffer.wrap((byte[])message)));
if (counter.decrementAndGet() == 0) {
disposable.dispose();
emitter.complete();
}
}
@Override
@ -248,11 +241,6 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
}
emitter.next(new ChannelMessage<>(ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])msg)));
if (counter.decrementAndGet() == 0) {
disposable.dispose();
emitter.complete();
}
}
};

@ -223,7 +223,6 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
emitter.onRequest(n -> {
monosListener.addListener(() -> {
AtomicLong counter = new AtomicLong(n);
BaseRedisPubSubListener listener = new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) {
@ -234,11 +233,6 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
emitter.next(new PatternMessage<>(ByteBuffer.wrap(pattern.toString().getBytes()),
ByteBuffer.wrap(channel.toString().getBytes()),
ByteBuffer.wrap((byte[])message)));
if (counter.decrementAndGet() == 0) {
disposable.dispose();
emitter.complete();
}
}
@Override
@ -248,11 +242,6 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
}
emitter.next(new ChannelMessage<>(ByteBuffer.wrap(channel.toString().getBytes()), ByteBuffer.wrap((byte[])msg)));
if (counter.decrementAndGet() == 0) {
disposable.dispose();
emitter.complete();
}
}
};

@ -15,8 +15,31 @@ import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import reactor.core.publisher.Mono;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonSubscribeReactiveTest extends BaseConnectionTest {
@Test
public void testPubSub() {
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
AtomicLong counter = new AtomicLong();
ReactiveStringRedisTemplate template = new ReactiveStringRedisTemplate(factory);
template.listenTo(ChannelTopic.of("test")).flatMap(message -> {
counter.incrementAndGet();
return Mono.empty();
}).subscribe();
for (int i = 0; i < 40; i++) {
ReactiveRedisConnection connection = factory.getReactiveConnection();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
}
Awaitility.await().atMost(Duration.ONE_SECOND).untilAsserted(() -> {
assertThat(counter.get()).isEqualTo(40);
});
}
@Test
public void testTemplate() {
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);

Loading…
Cancel
Save