diff --git a/redisson/src/main/java/org/redisson/connection/ServiceManager.java b/redisson/src/main/java/org/redisson/connection/ServiceManager.java index 23b0abc10..482e9d5f7 100644 --- a/redisson/src/main/java/org/redisson/connection/ServiceManager.java +++ b/redisson/src/main/java/org/redisson/connection/ServiceManager.java @@ -61,6 +61,7 @@ import org.redisson.config.Protocol; import org.redisson.config.TransportMode; import org.redisson.liveobject.resolver.MapResolver; import org.redisson.misc.CompletableFutureWrapper; +import org.redisson.misc.FastRemovalQueue; import org.redisson.misc.RandomXoshiro256PlusPlus; import org.redisson.misc.RedisURI; import org.redisson.remote.ResponseEntry; @@ -82,6 +83,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; /** * @@ -375,19 +378,22 @@ public final class ServiceManager { return socketChannelClass; } - private final AtomicInteger lastFuturesCounter = new AtomicInteger(); - private final Deque> lastFutures = new ConcurrentLinkedDeque<>(); + private final FastRemovalQueue> lastFutures = new FastRemovalQueue<>(); public void addFuture(CompletableFuture future) { - lastFutures.addLast(future); - if (lastFuturesCounter.incrementAndGet() > 100) { - lastFutures.pollFirst(); - lastFuturesCounter.decrementAndGet(); + lastFutures.add(future); + future.whenComplete((r, e) -> { + lastFutures.remove(future); + }); + + if (lastFutures.size() > 100) { + lastFutures.poll(); } } public void shutdownFutures(long timeout, TimeUnit unit) { - CompletableFuture future = CompletableFuture.allOf(lastFutures.toArray(new CompletableFuture[0])); + Stream> stream = StreamSupport.stream(lastFutures.spliterator(), false); + CompletableFuture future = CompletableFuture.allOf(stream.toArray(CompletableFuture[]::new)); try { future.get(timeout, unit); } catch (Exception e) { diff --git a/redisson/src/test/java/org/redisson/RedissonTest.java b/redisson/src/test/java/org/redisson/RedissonTest.java index 44deea822..c731d8bbf 100644 --- a/redisson/src/test/java/org/redisson/RedissonTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTest.java @@ -944,6 +944,11 @@ public class RedissonTest extends RedisDockerTest { long quietPeriod = TimeUnit.MILLISECONDS.toMillis(50); long timeOut = quietPeriod + TimeUnit.SECONDS.toMillis(2); RedissonClient r = createInstance(); + RBucket b = r.getBucket("test1"); + for (int i = 0; i < 10; i++) { + b.get(); + } + long startTime = System.currentTimeMillis(); r.shutdown(quietPeriod, timeOut, TimeUnit.MILLISECONDS); long shutdownTime = System.currentTimeMillis() - startTime;