|
|
@ -66,7 +66,7 @@ public class PublishSubscribeService {
|
|
|
|
|
|
|
|
|
|
|
|
private final ConcurrentMap<ChannelName, PubSubConnectionEntry> name2PubSubConnection = new ConcurrentHashMap<>();
|
|
|
|
private final ConcurrentMap<ChannelName, PubSubConnectionEntry> name2PubSubConnection = new ConcurrentHashMap<>();
|
|
|
|
|
|
|
|
|
|
|
|
private final ConcurrentMap<MasterSlaveEntry, Queue<PubSubConnectionEntry>> freePubSubMap = new ConcurrentHashMap<>();
|
|
|
|
private final ConcurrentMap<MasterSlaveEntry, Queue<PubSubConnectionEntry>> entry2PubSubConnection = new ConcurrentHashMap<>();
|
|
|
|
|
|
|
|
|
|
|
|
private final Queue<PubSubConnectionEntry> emptyQueue = new LinkedList<>();
|
|
|
|
private final Queue<PubSubConnectionEntry> emptyQueue = new LinkedList<>();
|
|
|
|
|
|
|
|
|
|
|
@ -219,7 +219,7 @@ public class PublishSubscribeService {
|
|
|
|
private Queue<PubSubConnectionEntry> getConnectionsQueue(ChannelName channelName) {
|
|
|
|
private Queue<PubSubConnectionEntry> getConnectionsQueue(ChannelName channelName) {
|
|
|
|
int slot = connectionManager.calcSlot(channelName.getName());
|
|
|
|
int slot = connectionManager.calcSlot(channelName.getName());
|
|
|
|
MasterSlaveEntry entry = connectionManager.getEntry(slot);
|
|
|
|
MasterSlaveEntry entry = connectionManager.getEntry(slot);
|
|
|
|
return freePubSubMap.getOrDefault(entry, emptyQueue);
|
|
|
|
return entry2PubSubConnection.getOrDefault(entry, emptyQueue);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private RFuture<Void> addListeners(ChannelName channelName, RPromise<PubSubConnectionEntry> promise,
|
|
|
|
private RFuture<Void> addListeners(ChannelName channelName, RPromise<PubSubConnectionEntry> promise,
|
|
|
@ -384,7 +384,7 @@ public class PublishSubscribeService {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
public void remove(MasterSlaveEntry entry) {
|
|
|
|
public void remove(MasterSlaveEntry entry) {
|
|
|
|
freePubSubMap.remove(entry);
|
|
|
|
entry2PubSubConnection.remove(entry);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
public RFuture<Codec> unsubscribe(ChannelName channelName, PubSubType topicType) {
|
|
|
|
public RFuture<Codec> unsubscribe(ChannelName channelName, PubSubType topicType) {
|
|
|
@ -497,7 +497,7 @@ public class PublishSubscribeService {
|
|
|
|
private void addFreeConnectionEntry(ChannelName channelName, PubSubConnectionEntry entry) {
|
|
|
|
private void addFreeConnectionEntry(ChannelName channelName, PubSubConnectionEntry entry) {
|
|
|
|
int slot = connectionManager.calcSlot(channelName.getName());
|
|
|
|
int slot = connectionManager.calcSlot(channelName.getName());
|
|
|
|
MasterSlaveEntry me = connectionManager.getEntry(slot);
|
|
|
|
MasterSlaveEntry me = connectionManager.getEntry(slot);
|
|
|
|
Queue<PubSubConnectionEntry> freePubSubConnections = freePubSubMap.computeIfAbsent(me, e -> new ConcurrentLinkedQueue<>());
|
|
|
|
Queue<PubSubConnectionEntry> freePubSubConnections = entry2PubSubConnection.computeIfAbsent(me, e -> new ConcurrentLinkedQueue<>());
|
|
|
|
freePubSubConnections.add(entry);
|
|
|
|
freePubSubConnections.add(entry);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -523,7 +523,7 @@ public class PublishSubscribeService {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
public void reattachPubSub(RedisPubSubConnection redisPubSubConnection) {
|
|
|
|
public void reattachPubSub(RedisPubSubConnection redisPubSubConnection) {
|
|
|
|
for (Queue<PubSubConnectionEntry> queue : freePubSubMap.values()) {
|
|
|
|
for (Queue<PubSubConnectionEntry> queue : entry2PubSubConnection.values()) {
|
|
|
|
for (PubSubConnectionEntry entry : queue) {
|
|
|
|
for (PubSubConnectionEntry entry : queue) {
|
|
|
|
if (entry.getConnection().equals(redisPubSubConnection)) {
|
|
|
|
if (entry.getConnection().equals(redisPubSubConnection)) {
|
|
|
|
freePubSubLock.acquire(new Runnable() {
|
|
|
|
freePubSubLock.acquire(new Runnable() {
|
|
|
@ -602,7 +602,7 @@ public class PublishSubscribeService {
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public String toString() {
|
|
|
|
public String toString() {
|
|
|
|
return "PublishSubscribeService [name2PubSubConnection=" + name2PubSubConnection + ", freePubSubMap=" + freePubSubMap + "]";
|
|
|
|
return "PublishSubscribeService [name2PubSubConnection=" + name2PubSubConnection + ", entry2PubSubConnection=" + entry2PubSubConnection + "]";
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|