refactoring

pull/3215/merge
Nikita Koksharov 3 years ago
parent 037fbc76a0
commit 732b28fade

@ -15,29 +15,26 @@
*/
package org.redisson.pubsub;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.redisson.PubSubMessageListener;
import org.redisson.PubSubPatternMessageListener;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.SubscribeListener;
import org.redisson.client.*;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import org.redisson.client.protocol.pubsub.PubSubType;
import io.netty.channel.ChannelFuture;
import org.redisson.connection.ConnectionManager;
import java.util.EventListener;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
* @author Nikita Koksharov
@ -184,11 +181,14 @@ public class PubSubConnectionEntry {
return listener;
}
public void unsubscribe(PubSubType commandType, ChannelName channel, RedisPubSubListener<?> listener, AtomicBoolean executed) {
public void unsubscribe(PubSubType commandType, ChannelName channel, RedisPubSubListener<?> listener) {
AtomicBoolean executed = new AtomicBoolean();
conn.addListener(new BaseRedisPubSubListener() {
@Override
public boolean onStatus(PubSubType type, CharSequence ch) {
if (type == commandType && channel.equals(ch)) {
executed.set(true);
conn.removeListener(this);
removeListeners(channel);
if (listener != null) {

@ -30,7 +30,6 @@ import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@ -459,15 +458,12 @@ public class PublishSubscribeService {
return CompletableFuture.completedFuture(null);
}
AtomicBoolean executed = new AtomicBoolean();
CompletableFuture<Void> result = new CompletableFuture<>();
BaseRedisPubSubListener listener = new BaseRedisPubSubListener() {
@Override
public boolean onStatus(PubSubType type, CharSequence channel) {
if (type == topicType && channel.equals(channelName)) {
executed.set(true);
if (entry.release() == 1) {
MasterSlaveEntry msEntry = getEntry(channelName);
msEntry.returnPubSubConnection(entry.getConnection());
@ -481,7 +477,7 @@ public class PublishSubscribeService {
};
entry.unsubscribe(topicType, channelName, listener, executed);
entry.unsubscribe(topicType, channelName, listener);
return result;
}
@ -521,14 +517,11 @@ public class PublishSubscribeService {
entryCodec = entry.getConnection().getChannels().get(channelName);
}
AtomicBoolean executed = new AtomicBoolean();
RedisPubSubListener<Object> listener = new BaseRedisPubSubListener() {
@Override
public boolean onStatus(PubSubType type, CharSequence channel) {
if (type == topicType && channel.equals(channelName)) {
executed.set(true);
lock.release();
result.complete(entryCodec);
return true;
@ -538,7 +531,7 @@ public class PublishSubscribeService {
};
entry.unsubscribe(topicType, channelName, listener, executed);
entry.unsubscribe(topicType, channelName, listener);
});
});

Loading…
Cancel
Save