refactoring

pull/3215/merge
Nikita Koksharov 3 years ago
parent 98370cd597
commit 9e58872b8c

@ -17,7 +17,6 @@ package org.redisson.client;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.CommandData;
@ -26,7 +25,10 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.MultiDecoder; import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.client.protocol.pubsub.*; import org.redisson.client.protocol.pubsub.*;
import java.util.*; import java.util.Collections;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
@ -38,11 +40,11 @@ import java.util.concurrent.ConcurrentLinkedQueue;
*/ */
public class RedisPubSubConnection extends RedisConnection { public class RedisPubSubConnection extends RedisConnection {
final Queue<RedisPubSubListener<Object>> listeners = new ConcurrentLinkedQueue<RedisPubSubListener<Object>>(); final Queue<RedisPubSubListener<Object>> listeners = new ConcurrentLinkedQueue<>();
final Map<ChannelName, Codec> channels = new ConcurrentHashMap<>(); final Map<ChannelName, Codec> channels = new ConcurrentHashMap<>();
final Map<ChannelName, Codec> patternChannels = new ConcurrentHashMap<>(); final Map<ChannelName, Codec> patternChannels = new ConcurrentHashMap<>();
final Set<ChannelName> unsubscibedChannels = new HashSet<ChannelName>(); final Set<ChannelName> unsubscibedChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
final Set<ChannelName> punsubscibedChannels = new HashSet<ChannelName>(); final Set<ChannelName> punsubscibedChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
public RedisPubSubConnection(RedisClient redisClient, Channel channel, CompletableFuture<RedisPubSubConnection> connectionPromise) { public RedisPubSubConnection(RedisClient redisClient, Channel channel, CompletableFuture<RedisPubSubConnection> connectionPromise) {
super(redisClient, channel, connectionPromise); super(redisClient, channel, connectionPromise);
@ -88,22 +90,28 @@ public class RedisPubSubConnection extends RedisConnection {
return async(new PubSubPatternMessageDecoder(codec.getValueDecoder()), RedisCommands.PSUBSCRIBE, channels); return async(new PubSubPatternMessageDecoder(codec.getValueDecoder()), RedisCommands.PSUBSCRIBE, channels);
} }
public ChannelFuture unsubscribe(ChannelName... channels) { public ChannelFuture unsubscribe(PubSubType type, ChannelName... channels) {
synchronized (this) { RedisCommand<Object> command;
if (type == PubSubType.UNSUBSCRIBE) {
command = RedisCommands.UNSUBSCRIBE;
for (ChannelName ch : channels) { for (ChannelName ch : channels) {
this.channels.remove(ch); this.channels.remove(ch);
unsubscibedChannels.add(ch); unsubscibedChannels.add(ch);
} }
} else {
command = RedisCommands.PUNSUBSCRIBE;
for (ChannelName ch : channels) {
patternChannels.remove(ch);
punsubscibedChannels.add(ch);
} }
ChannelFuture future = async((MultiDecoder) null, RedisCommands.UNSUBSCRIBE, channels); }
future.addListener(new FutureListener<Void>() {
@Override ChannelFuture future = async((MultiDecoder) null, command, channels);
public void operationComplete(Future<Void> future) throws Exception { future.addListener((FutureListener<Void>) f -> {
if (!future.isSuccess()) { if (!f.isSuccess()) {
for (ChannelName channel : channels) { for (ChannelName channel : channels) {
removeDisconnectListener(channel); removeDisconnectListener(channel);
onMessage(new PubSubStatusMessage(PubSubType.UNSUBSCRIBE, channel)); onMessage(new PubSubStatusMessage(type, channel));
}
} }
} }
}); });
@ -111,52 +119,22 @@ public class RedisPubSubConnection extends RedisConnection {
} }
public void removeDisconnectListener(ChannelName channel) { public void removeDisconnectListener(ChannelName channel) {
synchronized (this) {
unsubscibedChannels.remove(channel); unsubscibedChannels.remove(channel);
punsubscibedChannels.remove(channel); punsubscibedChannels.remove(channel);
} }
}
@Override @Override
public void fireDisconnected() { public void fireDisconnected() {
super.fireDisconnected(); super.fireDisconnected();
Set<ChannelName> channels = new HashSet<ChannelName>(); for (ChannelName channel : unsubscibedChannels) {
Set<ChannelName> pchannels = new HashSet<ChannelName>();
synchronized (this) {
channels.addAll(unsubscibedChannels);
pchannels.addAll(punsubscibedChannels);
}
for (ChannelName channel : channels) {
onMessage(new PubSubStatusMessage(PubSubType.UNSUBSCRIBE, channel)); onMessage(new PubSubStatusMessage(PubSubType.UNSUBSCRIBE, channel));
} }
for (ChannelName channel : pchannels) { for (ChannelName channel : punsubscibedChannels) {
onMessage(new PubSubStatusMessage(PubSubType.PUNSUBSCRIBE, channel)); onMessage(new PubSubStatusMessage(PubSubType.PUNSUBSCRIBE, channel));
} }
} }
public ChannelFuture punsubscribe(ChannelName... channels) {
synchronized (this) {
for (ChannelName ch : channels) {
patternChannels.remove(ch);
punsubscibedChannels.add(ch);
}
}
ChannelFuture future = async((MultiDecoder) null, RedisCommands.PUNSUBSCRIBE, channels);
future.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
for (ChannelName channel : channels) {
removeDisconnectListener(channel);
onMessage(new PubSubStatusMessage(PubSubType.PUNSUBSCRIBE, channel));
}
}
}
});
return future;
}
private <T, R> ChannelFuture async(MultiDecoder<Object> messageDecoder, RedisCommand<T> command, Object... params) { private <T, R> ChannelFuture async(MultiDecoder<Object> messageDecoder, RedisCommand<T> command, Object... params) {
CompletableFuture<R> promise = new CompletableFuture<>(); CompletableFuture<R> promise = new CompletableFuture<>();
return channel.writeAndFlush(new CommandData<>(promise, messageDecoder, null, command, params)); return channel.writeAndFlush(new CommandData<>(promise, messageDecoder, null, command, params));

@ -19,8 +19,11 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import io.netty.channel.ChannelFutureListener;
import org.redisson.PubSubMessageListener; import org.redisson.PubSubMessageListener;
import org.redisson.PubSubPatternMessageListener; import org.redisson.PubSubPatternMessageListener;
import org.redisson.client.BaseRedisPubSubListener; import org.redisson.client.BaseRedisPubSubListener;
@ -29,9 +32,11 @@ import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisPubSubListener; import org.redisson.client.RedisPubSubListener;
import org.redisson.client.SubscribeListener; import org.redisson.client.SubscribeListener;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import org.redisson.client.protocol.pubsub.PubSubType; import org.redisson.client.protocol.pubsub.PubSubType;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import org.redisson.connection.ConnectionManager;
/** /**
* *
@ -48,10 +53,13 @@ public class PubSubConnectionEntry {
private static final Queue<RedisPubSubListener<?>> EMPTY_QUEUE = new LinkedList<>(); private static final Queue<RedisPubSubListener<?>> EMPTY_QUEUE = new LinkedList<>();
public PubSubConnectionEntry(RedisPubSubConnection conn, int subscriptionsPerConnection) { private final ConnectionManager connectionManager;
public PubSubConnectionEntry(RedisPubSubConnection conn, ConnectionManager connectionManager) {
super(); super();
this.conn = conn; this.conn = conn;
this.subscribedChannelsAmount = new AtomicInteger(subscriptionsPerConnection); this.connectionManager = connectionManager;
this.subscribedChannelsAmount = new AtomicInteger(connectionManager.getConfig().getSubscriptionsPerConnection());
} }
public int countListeners(ChannelName channelName) { public int countListeners(ChannelName channelName) {
@ -176,11 +184,11 @@ public class PubSubConnectionEntry {
return listener; return listener;
} }
public ChannelFuture unsubscribe(ChannelName channel, RedisPubSubListener<?> listener) { public void unsubscribe(PubSubType commandType, ChannelName channel, RedisPubSubListener<?> listener, AtomicBoolean executed) {
conn.addListener(new BaseRedisPubSubListener() { conn.addListener(new BaseRedisPubSubListener() {
@Override @Override
public boolean onStatus(PubSubType type, CharSequence ch) { public boolean onStatus(PubSubType type, CharSequence ch) {
if (type == PubSubType.UNSUBSCRIBE && channel.equals(ch)) { if (type == commandType && channel.equals(ch)) {
conn.removeListener(this); conn.removeListener(this);
removeListeners(channel); removeListeners(channel);
if (listener != null) { if (listener != null) {
@ -190,9 +198,21 @@ public class PubSubConnectionEntry {
} }
return false; return false;
} }
});
ChannelFuture future = conn.unsubscribe(commandType, channel);
future.addListener((ChannelFutureListener) f -> {
if (!f.isSuccess()) {
return;
}
connectionManager.newTimeout(timeout -> {
if (executed.get()) {
return;
}
conn.onMessage(new PubSubStatusMessage(commandType, channel));
}, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);
}); });
return conn.unsubscribe(channel);
} }
private void removeListeners(ChannelName channel) { private void removeListeners(ChannelName channel) {
@ -210,24 +230,6 @@ public class PubSubConnectionEntry {
} }
} }
public ChannelFuture punsubscribe(final ChannelName channel, final RedisPubSubListener<?> listener) {
conn.addListener(new BaseRedisPubSubListener() {
@Override
public boolean onStatus(PubSubType type, CharSequence ch) {
if (type == PubSubType.PUNSUBSCRIBE && channel.equals(ch)) {
conn.removeListener(this);
removeListeners(channel);
if (listener != null) {
listener.onStatus(type, ch);
}
return true;
}
return false;
}
});
return conn.punsubscribe(channel);
}
public RedisPubSubConnection getConnection() { public RedisPubSubConnection getConnection() {
return conn; return conn;
} }

@ -21,7 +21,6 @@ import io.netty.util.Timeout;
import org.redisson.PubSubPatternStatusListener; import org.redisson.PubSubPatternStatusListener;
import org.redisson.client.*; import org.redisson.client.*;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import org.redisson.client.protocol.pubsub.PubSubType; import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
@ -409,7 +408,7 @@ public class PublishSubscribeService {
} }
freePubSubLock.acquire(() -> { freePubSubLock.acquire(() -> {
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection()); PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, connectionManager);
int remainFreeAmount = entry.tryAcquire(); int remainFreeAmount = entry.tryAcquire();
PubSubKey key = new PubSubKey(channelName, msEntry); PubSubKey key = new PubSubKey(channelName, msEntry);
@ -482,26 +481,7 @@ public class PublishSubscribeService {
}; };
ChannelFuture future; entry.unsubscribe(topicType, channelName, listener, executed);
if (topicType == PubSubType.UNSUBSCRIBE) {
future = entry.unsubscribe(channelName, listener);
} else {
future = entry.punsubscribe(channelName, listener);
}
future.addListener((ChannelFutureListener) f -> {
if (!f.isSuccess()) {
return;
}
connectionManager.newTimeout(timeout -> {
if (executed.get()) {
return;
}
entry.getConnection().onMessage(new PubSubStatusMessage(topicType, channelName));
}, config.getTimeout(), TimeUnit.MILLISECONDS);
});
return result; return result;
} }
@ -558,25 +538,7 @@ public class PublishSubscribeService {
}; };
ChannelFuture future; entry.unsubscribe(topicType, channelName, listener, executed);
if (topicType == PubSubType.PUNSUBSCRIBE) {
future = entry.punsubscribe(channelName, listener);
} else {
future = entry.unsubscribe(channelName, listener);
}
future.addListener((ChannelFutureListener) f -> {
if (!f.isSuccess()) {
return;
}
connectionManager.newTimeout(timeout -> {
if (executed.get()) {
return;
}
entry.getConnection().onMessage(new PubSubStatusMessage(topicType, channelName));
}, config.getTimeout(), TimeUnit.MILLISECONDS);
});
}); });
}); });

Loading…
Cancel
Save