Fixed - name2entry.remove() invocation should be guarded by lock

pull/5041/head
Nikita Koksharov 2 years ago
parent 8123f6fd31
commit 94841f601b

@ -487,12 +487,13 @@ public class PublishSubscribeService {
}
private CompletableFuture<Void> unsubscribeLocked(PubSubType topicType, ChannelName channelName, MasterSlaveEntry msEntry) {
name2entry.remove(channelName);
PubSubConnectionEntry entry = name2PubSubConnection.remove(new PubSubKey(channelName, msEntry));
if (entry == null || connectionManager.getServiceManager().isShuttingDown()) {
return CompletableFuture.completedFuture(null);
}
remove(channelName, msEntry);
CompletableFuture<Void> result = new CompletableFuture<>();
BaseRedisPubSubListener listener = new BaseRedisPubSubListener() {
@ -516,6 +517,18 @@ public class PublishSubscribeService {
return result;
}
private void remove(ChannelName channelName, MasterSlaveEntry entry) {
Collection<MasterSlaveEntry> ee = name2entry.get(channelName);
if (ee == null) {
return;
}
ee.remove(entry);
if (ee.isEmpty()) {
name2entry.remove(channelName);
}
}
private void release(PubSubConnectionEntry entry, MasterSlaveEntry msEntry) {
entry.release();
if (entry.isFree()) {
@ -564,13 +577,14 @@ public class PublishSubscribeService {
AsyncSemaphore lock = getSemaphore(channelName);
CompletableFuture<Void> f = lock.acquire();
return f.thenCompose(v -> {
name2entry.remove(channelName);
PubSubConnectionEntry entry = name2PubSubConnection.remove(new PubSubKey(channelName, e));
if (entry == null) {
lock.release();
return CompletableFuture.completedFuture(null);
}
remove(channelName, e);
Codec entryCodec;
if (topicType == PubSubType.PUNSUBSCRIBE) {
entryCodec = entry.getConnection().getPatternChannels().get(channelName);

Loading…
Cancel
Save