Fixed - Memory leak during Queue blocking methods invocation. #2055

pull/2070/head
Nikita Koksharov 6 years ago
parent a056a200ca
commit 765d6ed129

@ -29,7 +29,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -88,6 +87,7 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.FutureListener;
/**
*
@ -941,15 +941,8 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
private <R, V> void handleBlockingOperations(AsyncDetails<V, R> details, RedisConnection connection, Long popTimeout) {
AtomicBoolean skip = new AtomicBoolean();
BiConsumer<Boolean, Throwable> listener = new BiConsumer<Boolean, Throwable>() {
@Override
public void accept(Boolean t, Throwable u) {
if (skip.get()) {
return;
}
details.getMainPromise().tryFailure(new RedissonShutdownException("Redisson is shutdown"));
}
FutureListener<Void> listener = f -> {
details.getMainPromise().tryFailure(new RedissonShutdownException("Redisson is shutdown"));
};
Timeout scheduledFuture;
@ -973,7 +966,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
synchronized (listener) {
skip.set(true);
connectionManager.getShutdownPromise().removeListener(listener);
}
// handling cancel operation for blocking commands
@ -992,7 +985,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
synchronized (listener) {
if (!details.getMainPromise().isDone()) {
connectionManager.getShutdownPromise().onComplete(listener);
connectionManager.getShutdownPromise().addListener(listener);
}
}
}

@ -37,6 +37,7 @@ import org.redisson.pubsub.PublishSubscribeService;
import io.netty.channel.EventLoopGroup;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
/**
*
@ -109,6 +110,6 @@ public interface ConnectionManager {
InfinitySemaphoreLatch getShutdownLatch();
RFuture<Boolean> getShutdownPromise();
Future<Void> getShutdownPromise();
}

@ -73,6 +73,9 @@ import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;
/**
@ -132,7 +135,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
private final AtomicReferenceArray<MasterSlaveEntry> slot2entry = new AtomicReferenceArray<>(MAX_SLOT);
private final Map<RedisClient, MasterSlaveEntry> client2entry = new ConcurrentHashMap<>();
private final RPromise<Boolean> shutdownPromise;
private final Promise<Void> shutdownPromise = ImmediateEventExecutor.INSTANCE.newPromise();
private final InfinitySemaphoreLatch shutdownLatch = new InfinitySemaphoreLatch();
@ -217,7 +220,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
this.cfg = cfg;
this.codec = cfg.getCodec();
this.shutdownPromise = new RedissonPromise<Boolean>();
this.commandExecutor = new CommandSyncService(this);
}
@ -651,7 +653,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
timer.stop();
shutdownLatch.close();
shutdownPromise.trySuccess(true);
shutdownPromise.trySuccess(null);
shutdownLatch.awaitUninterruptibly();
if (cfg.getEventLoopGroup() == null) {
@ -691,7 +693,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
@Override
public RFuture<Boolean> getShutdownPromise() {
public Future<Void> getShutdownPromise() {
return shutdownPromise;
}

Loading…
Cancel
Save