Fixed - Out of Memory error #6470

pull/6479/head
mrniko 3 days ago
parent 9d5b0a5d45
commit 4f9ce33cef

@ -61,6 +61,7 @@ import org.redisson.config.Protocol;
import org.redisson.config.TransportMode; import org.redisson.config.TransportMode;
import org.redisson.liveobject.resolver.MapResolver; import org.redisson.liveobject.resolver.MapResolver;
import org.redisson.misc.CompletableFutureWrapper; import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.FastRemovalQueue;
import org.redisson.misc.RandomXoshiro256PlusPlus; import org.redisson.misc.RandomXoshiro256PlusPlus;
import org.redisson.misc.RedisURI; import org.redisson.misc.RedisURI;
import org.redisson.remote.ResponseEntry; import org.redisson.remote.ResponseEntry;
@ -82,6 +83,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
/** /**
* *
@ -375,19 +378,22 @@ public final class ServiceManager {
return socketChannelClass; return socketChannelClass;
} }
private final AtomicInteger lastFuturesCounter = new AtomicInteger(); private final FastRemovalQueue<CompletableFuture<?>> lastFutures = new FastRemovalQueue<>();
private final Deque<CompletableFuture<?>> lastFutures = new ConcurrentLinkedDeque<>();
public void addFuture(CompletableFuture<?> future) { public void addFuture(CompletableFuture<?> future) {
lastFutures.addLast(future); lastFutures.add(future);
if (lastFuturesCounter.incrementAndGet() > 100) { future.whenComplete((r, e) -> {
lastFutures.pollFirst(); lastFutures.remove(future);
lastFuturesCounter.decrementAndGet(); });
if (lastFutures.size() > 100) {
lastFutures.poll();
} }
} }
public void shutdownFutures(long timeout, TimeUnit unit) { public void shutdownFutures(long timeout, TimeUnit unit) {
CompletableFuture<Void> future = CompletableFuture.allOf(lastFutures.toArray(new CompletableFuture[0])); Stream<CompletableFuture<?>> stream = StreamSupport.stream(lastFutures.spliterator(), false);
CompletableFuture<Void> future = CompletableFuture.allOf(stream.toArray(CompletableFuture[]::new));
try { try {
future.get(timeout, unit); future.get(timeout, unit);
} catch (Exception e) { } catch (Exception e) {

@ -944,6 +944,11 @@ public class RedissonTest extends RedisDockerTest {
long quietPeriod = TimeUnit.MILLISECONDS.toMillis(50); long quietPeriod = TimeUnit.MILLISECONDS.toMillis(50);
long timeOut = quietPeriod + TimeUnit.SECONDS.toMillis(2); long timeOut = quietPeriod + TimeUnit.SECONDS.toMillis(2);
RedissonClient r = createInstance(); RedissonClient r = createInstance();
RBucket<Integer> b = r.getBucket("test1");
for (int i = 0; i < 10; i++) {
b.get();
}
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
r.shutdown(quietPeriod, timeOut, TimeUnit.MILLISECONDS); r.shutdown(quietPeriod, timeOut, TimeUnit.MILLISECONDS);
long shutdownTime = System.currentTimeMillis() - startTime; long shutdownTime = System.currentTimeMillis() - startTime;

Loading…
Cancel
Save