From e150e5dcbc00023b26a6289ea54a706c89316644 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Sat, 25 May 2019 09:51:02 +0300 Subject: [PATCH] refactoring --- .../RedissonReactiveSubscription.java | 35 ++++++------------- 1 file changed, 10 insertions(+), 25 deletions(-) diff --git a/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java b/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java index e8f4b0fb7..7705a9afc 100644 --- a/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java +++ b/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java @@ -16,7 +16,6 @@ package org.redisson.spring.data.connection; import java.nio.ByteBuffer; -import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Set; @@ -128,7 +127,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { } private final AtomicReference>> flux = new AtomicReference<>(); - private Disposable disposable; + private volatile Disposable disposable; @Override public Flux> receive() { @@ -138,19 +137,6 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { Flux> f = Flux.>create(emitter -> { emitter.onRequest(n -> { - Map channelMap = new HashMap(); - Map patternMap = new HashMap(); - - disposable = () -> { - for (Entry entry : channelMap.entrySet()) { - PubSubConnectionEntry e = channels.get(entry.getKey()); - e.removeListener(toChannelName(entry.getKey()), entry.getValue()); - } - for (Entry entry : patternMap.entrySet()) { - PubSubConnectionEntry e = patterns.get(entry.getKey()); - e.removeListener(toChannelName(entry.getKey()), entry.getValue()); - } - }; AtomicLong counter = new AtomicLong(n); BaseRedisPubSubListener listener = new BaseRedisPubSubListener() { @@ -175,25 +161,24 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { } } }; + + disposable = () -> { + for (Entry entry : channels.entrySet()) { + entry.getValue().removeListener(toChannelName(entry.getKey()), listener); + } + for (Entry entry : patterns.entrySet()) { + entry.getValue().removeListener(toChannelName(entry.getKey()), listener); + } + }; for (Entry entry : channels.entrySet()) { - channelMap.put(entry.getKey(), listener); entry.getValue().addListener(toChannelName(entry.getKey()), listener); } for (Entry entry : patterns.entrySet()) { - patternMap.put(entry.getKey(), listener); entry.getValue().addListener(toChannelName(entry.getKey()), listener); } - disposable = () -> { - for (Entry entry : channels.entrySet()) { - entry.getValue().removeListener(toChannelName(entry.getKey()), listener); - } - for (Entry entry : patterns.entrySet()) { - entry.getValue().removeListener(toChannelName(entry.getKey()), listener); - } - }; emitter.onDispose(disposable); }); });