From 765d6ed1297d3b7408bea48b1d5e02f0c55b8fdb Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 24 Apr 2019 07:50:46 +0300 Subject: [PATCH] Fixed - Memory leak during Queue blocking methods invocation. #2055 --- .../redisson/command/CommandAsyncService.java | 17 +++++------------ .../redisson/connection/ConnectionManager.java | 3 ++- .../MasterSlaveConnectionManager.java | 10 ++++++---- 3 files changed, 13 insertions(+), 17 deletions(-) diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 462a9121c..42c647ba1 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -29,7 +29,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -88,6 +87,7 @@ import io.netty.channel.ChannelFutureListener; import io.netty.util.ReferenceCountUtil; import io.netty.util.Timeout; import io.netty.util.TimerTask; +import io.netty.util.concurrent.FutureListener; /** * @@ -941,15 +941,8 @@ public class CommandAsyncService implements CommandAsyncExecutor { } private void handleBlockingOperations(AsyncDetails details, RedisConnection connection, Long popTimeout) { - AtomicBoolean skip = new AtomicBoolean(); - BiConsumer listener = new BiConsumer() { - @Override - public void accept(Boolean t, Throwable u) { - if (skip.get()) { - return; - } - details.getMainPromise().tryFailure(new RedissonShutdownException("Redisson is shutdown")); - } + FutureListener listener = f -> { + details.getMainPromise().tryFailure(new RedissonShutdownException("Redisson is shutdown")); }; Timeout scheduledFuture; @@ -973,7 +966,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { } synchronized (listener) { - skip.set(true); + connectionManager.getShutdownPromise().removeListener(listener); } // handling cancel operation for blocking commands @@ -992,7 +985,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { synchronized (listener) { if (!details.getMainPromise().isDone()) { - connectionManager.getShutdownPromise().onComplete(listener); + connectionManager.getShutdownPromise().addListener(listener); } } } diff --git a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java index 54f7ba188..e36a13876 100644 --- a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java @@ -37,6 +37,7 @@ import org.redisson.pubsub.PublishSubscribeService; import io.netty.channel.EventLoopGroup; import io.netty.util.Timeout; import io.netty.util.TimerTask; +import io.netty.util.concurrent.Future; /** * @@ -109,6 +110,6 @@ public interface ConnectionManager { InfinitySemaphoreLatch getShutdownLatch(); - RFuture getShutdownPromise(); + Future getShutdownPromise(); } diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 1e3b18069..d364bd4fb 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -73,6 +73,9 @@ import io.netty.util.Timeout; import io.netty.util.Timer; import io.netty.util.TimerTask; import io.netty.util.concurrent.DefaultThreadFactory; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.ImmediateEventExecutor; +import io.netty.util.concurrent.Promise; import io.netty.util.internal.PlatformDependent; /** @@ -132,7 +135,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { private final AtomicReferenceArray slot2entry = new AtomicReferenceArray<>(MAX_SLOT); private final Map client2entry = new ConcurrentHashMap<>(); - private final RPromise shutdownPromise; + private final Promise shutdownPromise = ImmediateEventExecutor.INSTANCE.newPromise(); private final InfinitySemaphoreLatch shutdownLatch = new InfinitySemaphoreLatch(); @@ -217,7 +220,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager { this.cfg = cfg; this.codec = cfg.getCodec(); - this.shutdownPromise = new RedissonPromise(); this.commandExecutor = new CommandSyncService(this); } @@ -651,7 +653,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { timer.stop(); shutdownLatch.close(); - shutdownPromise.trySuccess(true); + shutdownPromise.trySuccess(null); shutdownLatch.awaitUninterruptibly(); if (cfg.getEventLoopGroup() == null) { @@ -691,7 +693,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } @Override - public RFuture getShutdownPromise() { + public Future getShutdownPromise() { return shutdownPromise; }