Fixed Out of memory problem during RedissonExecutorService usage. #1158

pull/1204/head
Nikita 7 years ago
parent 84f5629143
commit 959cca4fe1

@ -23,7 +23,6 @@ import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@ -59,6 +58,7 @@ import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.ScheduledFuture;
import io.netty.util.internal.ThreadLocalRandom;
/**
@ -381,26 +381,45 @@ public abstract class BaseRemoteService {
private <T extends RRemoteServiceResponse> RPromise<T> poll(final long timeout,
final String requestId, final String responseName) {
final RPromise<T> responseFuture = new RedissonPromise<T>();
ResponseEntry entry = responses.get(responseName);
if (entry == null) {
entry = new ResponseEntry();
ResponseEntry oldEntry = responses.putIfAbsent(responseName, entry);
if (oldEntry != null) {
entry = oldEntry;
ResponseEntry entry;
synchronized (responses) {
entry = responses.get(responseName);
if (entry == null) {
entry = new ResponseEntry();
ResponseEntry oldEntry = responses.putIfAbsent(responseName, entry);
if (oldEntry != null) {
entry = oldEntry;
}
}
final ConcurrentMap<String, RPromise<? extends RRemoteServiceResponse>> entryResponses = entry.getResponses();
entryResponses.put(requestId, responseFuture);
}
final ConcurrentMap<String, RPromise<? extends RRemoteServiceResponse>> responses = entry.getResponses();
commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() {
ScheduledFuture<?> future = commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() {
@Override
public void run() {
if (!responseFuture.isDone() && responses.remove(requestId, responseFuture)) {
responseFuture.trySuccess(null);
synchronized (responses) {
ResponseEntry entry = responses.get(responseName);
if (entry == null) {
return;
}
ConcurrentMap<String, RPromise<? extends RRemoteServiceResponse>> entryResponses = entry.getResponses();
RemoteServiceTimeoutException ex = new RemoteServiceTimeoutException("No response after " + timeout + "ms");
if (responseFuture.tryFailure(ex)) {
entry.getTimeouts().remove(requestId);
entryResponses.remove(requestId, responseFuture);
if (entry.getResponses().isEmpty()) {
responses.remove(responseName, entry);
}
}
}
}
}, timeout, TimeUnit.MILLISECONDS);
responses.put(requestId, responseFuture);
entry.getTimeouts().put(requestId, future);
pollTasks(entry, responseName);
return responseFuture;
}
@ -422,19 +441,29 @@ public abstract class BaseRemoteService {
}
RRemoteServiceResponse response = future.getNow();
RPromise<RRemoteServiceResponse> promise = (RPromise<RRemoteServiceResponse>) entry.getResponses().remove(response.getId());
RPromise<RRemoteServiceResponse> promise;
synchronized (responses) {
ResponseEntry entry = responses.get(responseName);
if (entry == null) {
return;
}
ConcurrentMap<String, RPromise<? extends RRemoteServiceResponse>> entryResponses = entry.getResponses();
promise = (RPromise<RRemoteServiceResponse>) entryResponses.remove(response.getId());
java.util.concurrent.ScheduledFuture<?> timeoutFuture = entry.getTimeouts().remove(response.getId());
timeoutFuture.cancel(false);
if (entryResponses.isEmpty()) {
responses.remove(responseName, entry);
} else {
responseQueue.takeAsync().addListener(this);
}
}
if (promise != null) {
promise.trySuccess(response);
}
if (!entry.getResponses().isEmpty()) {
responseQueue.takeAsync().addListener(this);
} else {
entry.getStarted().set(false);
if (!entry.getResponses().isEmpty()) {
pollTasks(entry, responseName);
}
}
}
});
}
@ -493,7 +522,7 @@ public abstract class BaseRemoteService {
RemoteServiceResponse response = (RemoteServiceResponse) responseQueue
.poll(optionsCopy.getExecutionTimeoutInMillis(), TimeUnit.MILLISECONDS);
if (response == null) {
throw new RemoteServiceTimeoutException("No response1 after "
throw new RemoteServiceTimeoutException("No response after "
+ optionsCopy.getExecutionTimeoutInMillis() + "ms for request: " + request);
}
if (response.getError() != null) {

@ -16,6 +16,7 @@
package org.redisson.remote;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.misc.RPromise;
@ -30,8 +31,14 @@ import io.netty.util.internal.PlatformDependent;
public class ResponseEntry {
private final ConcurrentMap<String, RPromise<? extends RRemoteServiceResponse>> responses = PlatformDependent.newConcurrentHashMap();
private final ConcurrentMap<String, ScheduledFuture<?>> timeouts = PlatformDependent.newConcurrentHashMap();
private final AtomicBoolean started = new AtomicBoolean();
public ConcurrentMap<String, ScheduledFuture<?>> getTimeouts() {
return timeouts;
}
public ConcurrentMap<String, RPromise<? extends RRemoteServiceResponse>> getResponses() {
return responses;
}

Loading…
Cancel
Save