|
|
|
@ -33,6 +33,7 @@ import org.redisson.connection.ClientConnectionsEntry;
|
|
|
|
|
import org.redisson.connection.ConnectionManager;
|
|
|
|
|
import org.redisson.connection.MasterSlaveEntry;
|
|
|
|
|
import org.redisson.misc.AsyncSemaphore;
|
|
|
|
|
import org.redisson.misc.Tuple;
|
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
|
|
|
@ -49,31 +50,6 @@ import java.util.stream.Collectors;
|
|
|
|
|
*/
|
|
|
|
|
public class PublishSubscribeService {
|
|
|
|
|
|
|
|
|
|
public static class PubSubClientKey {
|
|
|
|
|
|
|
|
|
|
private final ChannelName channelName;
|
|
|
|
|
|
|
|
|
|
private final ClientConnectionsEntry entry;
|
|
|
|
|
|
|
|
|
|
public PubSubClientKey(ChannelName channelName, ClientConnectionsEntry entry) {
|
|
|
|
|
this.channelName = channelName;
|
|
|
|
|
this.entry = entry;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public boolean equals(Object o) {
|
|
|
|
|
if (this == o) return true;
|
|
|
|
|
if (o == null || getClass() != o.getClass()) return false;
|
|
|
|
|
PubSubClientKey that = (PubSubClientKey) o;
|
|
|
|
|
return Objects.equals(channelName, that.channelName) && Objects.equals(entry, that.entry);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public int hashCode() {
|
|
|
|
|
return Objects.hash(channelName, entry);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public static class PubSubKey {
|
|
|
|
|
|
|
|
|
|
private final ChannelName channelName;
|
|
|
|
@ -136,7 +112,7 @@ public class PublishSubscribeService {
|
|
|
|
|
private final Map<ChannelName, Collection<PubSubConnectionEntry>> name2entry = new ConcurrentHashMap<>();
|
|
|
|
|
private final ConcurrentMap<PubSubKey, PubSubConnectionEntry> name2PubSubConnection = new ConcurrentHashMap<>();
|
|
|
|
|
private final ConcurrentMap<MasterSlaveEntry, PubSubEntry> entry2PubSubConnection = new ConcurrentHashMap<>();
|
|
|
|
|
private final Map<PubSubClientKey, PubSubConnectionEntry> key2connection = new ConcurrentHashMap<>();
|
|
|
|
|
private final Map<Tuple<ChannelName, ClientConnectionsEntry>, PubSubConnectionEntry> key2connection = new ConcurrentHashMap<>();
|
|
|
|
|
|
|
|
|
|
private final SemaphorePubSub semaphorePubSub = new SemaphorePubSub(this);
|
|
|
|
|
|
|
|
|
@ -530,7 +506,7 @@ public class PublishSubscribeService {
|
|
|
|
|
PubSubType type, AsyncSemaphore lock, AtomicInteger attempts, RedisPubSubListener<?>... listeners) {
|
|
|
|
|
PubSubConnectionEntry connEntry;
|
|
|
|
|
if (clientEntry != null) {
|
|
|
|
|
connEntry = key2connection.get(new PubSubClientKey(channelName, clientEntry));
|
|
|
|
|
connEntry = key2connection.get(new Tuple<>(channelName, clientEntry));
|
|
|
|
|
} else {
|
|
|
|
|
connEntry = name2PubSubConnection.get(new PubSubKey(channelName, entry));
|
|
|
|
|
}
|
|
|
|
@ -571,7 +547,7 @@ public class PublishSubscribeService {
|
|
|
|
|
|
|
|
|
|
PubSubConnectionEntry oldEntry = null;
|
|
|
|
|
if (clientEntry != null) {
|
|
|
|
|
PubSubClientKey key = new PubSubClientKey(channelName, clientEntry);
|
|
|
|
|
Tuple<ChannelName, ClientConnectionsEntry> key = new Tuple(channelName, clientEntry);
|
|
|
|
|
oldEntry = key2connection.putIfAbsent(key, freeEntry);
|
|
|
|
|
if (channelName.isTracking()) {
|
|
|
|
|
clientEntry.getTrackedConnectionsHolder().incUsage();
|
|
|
|
@ -638,7 +614,7 @@ public class PublishSubscribeService {
|
|
|
|
|
|
|
|
|
|
PubSubConnectionEntry oldEntry = null;
|
|
|
|
|
if (clientEntry != null) {
|
|
|
|
|
PubSubClientKey key = new PubSubClientKey(channelName, clientEntry);
|
|
|
|
|
Tuple<ChannelName, ClientConnectionsEntry> key = new Tuple<>(channelName, clientEntry);
|
|
|
|
|
oldEntry = key2connection.putIfAbsent(key, entry);
|
|
|
|
|
if (channelName.isTracking()) {
|
|
|
|
|
clientEntry.getTrackedConnectionsHolder().incUsage();
|
|
|
|
@ -723,7 +699,7 @@ public class PublishSubscribeService {
|
|
|
|
|
name2PubSubConnection.remove(new PubSubKey(channelName, entry.getEntry()));
|
|
|
|
|
|
|
|
|
|
ClientConnectionsEntry e = entry.getEntry().getEntry(entry.getConnection().getRedisClient());
|
|
|
|
|
PubSubClientKey key = new PubSubClientKey(channelName, e);
|
|
|
|
|
Tuple<ChannelName, ClientConnectionsEntry> key = new Tuple<>(channelName, e);
|
|
|
|
|
key2connection.remove(key);
|
|
|
|
|
if (e.getTrackedConnectionsHolder().decUsage() == 0) {
|
|
|
|
|
e.getTrackedConnectionsHolder().reset();
|
|
|
|
|