Merge pull request #5038 from chltns44/master

fix: cause of memory leak name2entry
pull/5041/head
Nikita Koksharov 2 years ago committed by GitHub
commit 677e66859c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -475,8 +475,8 @@ public class PublishSubscribeService {
}
public CompletableFuture<Void> unsubscribeLocked(PubSubType topicType, ChannelName channelName) {
Collection<MasterSlaveEntry> coll = name2entry.getOrDefault(channelName, Collections.emptySet());
if (coll.isEmpty()) {
Collection<MasterSlaveEntry> coll = name2entry.remove(channelName);
if (coll == null || coll.isEmpty()) {
RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings.");
CompletableFuture<Void> promise = new CompletableFuture<>();
promise.completeExceptionally(ex);
@ -544,8 +544,8 @@ public class PublishSubscribeService {
}
public CompletableFuture<Codec> unsubscribe(ChannelName channelName, PubSubType topicType) {
Collection<MasterSlaveEntry> coll = name2entry.getOrDefault(channelName, Collections.emptySet());
if (coll.isEmpty()) {
Collection<MasterSlaveEntry> coll = name2entry.remove(channelName);
if (coll == null || coll.isEmpty()) {
RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings.");
CompletableFuture<Codec> promise = new CompletableFuture<>();
promise.completeExceptionally(ex);
@ -762,8 +762,8 @@ public class PublishSubscribeService {
}, timeout, TimeUnit.MILLISECONDS);
return sf.thenCompose(res -> {
Collection<MasterSlaveEntry> entries = name2entry.getOrDefault(channelName, Collections.emptySet());
if (entries.isEmpty()) {
Collection<MasterSlaveEntry> entries = name2entry.remove(channelName);
if (entries == null || entries.isEmpty()) {
semaphore.release();
return CompletableFuture.completedFuture(null);
}

Loading…
Cancel
Save