Memory consumption reduced during scheduled tasks processing #1158

pull/1552/head
Nikita 7 years ago
parent 8661de67f1
commit 4c8f966713

@ -23,6 +23,7 @@ import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@ -60,6 +61,7 @@ import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.ScheduledFuture;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ThreadLocalRandom;
/**
@ -71,12 +73,18 @@ public abstract class BaseRemoteService {
private static final Logger log = LoggerFactory.getLogger(BaseRemoteService.class);
private final Map<Class<?>, String> requestQueueNameCache = PlatformDependent.newConcurrentHashMap();
private final Map<Method, List<String>> methodSignaturesCache = PlatformDependent.newConcurrentHashMap();
protected final Codec codec;
protected final RedissonClient redisson;
protected final String name;
protected final CommandAsyncExecutor commandExecutor;
protected final String executorId;
protected final String cancelRequestMapName;
protected final String cancelResponseMapName;
protected final String responseQueueName;
private final ConcurrentMap<String, ResponseEntry> responses;
public BaseRemoteService(Codec codec, RedissonClient redisson, String name, CommandAsyncExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
@ -86,30 +94,26 @@ public abstract class BaseRemoteService {
this.commandExecutor = commandExecutor;
this.executorId = executorId;
this.responses = responses;
this.cancelRequestMapName = "{" + name + ":remote" + "}:cancel-request";
this.cancelResponseMapName = "{" + name + ":remote" + "}:cancel-response";
this.responseQueueName = getResponseQueueName(executorId);
}
protected String getCancelRequestMapName(Class<?> remoteInterface) {
// return "{" + name + ":" + remoteInterface.getName() + "}:cancel-request";
return "{" + name + ":remote" + "}:cancel-request";
protected String getResponseQueueName(String executorId) {
return "{remote_response}:" + executorId;
}
protected String getCancelResponseMapName(Class<?> remoteInterface) {
// return "{" + name + ":" + remoteInterface.getName() + "}:cancel-response";
return "{" + name + ":remote" + "}:cancel-response";
}
protected String getAckName(Class<?> remoteInterface, String requestId) {
// return "{" + name + ":" + remoteInterface.getName() + "}:" + requestId + ":ack";
protected String getAckName(String requestId) {
return "{" + name + ":remote" + "}:" + requestId + ":ack";
}
protected String getResponseQueueName(Class<?> remoteInterface, String executorId) {
// return "{" + name + ":" + remoteInterface.getName() + "}:" + executorId;
return "{remote_response}:" + executorId;
}
public String getRequestQueueName(Class<?> remoteInterface) {
return "{" + name + ":" + remoteInterface.getName() + "}";
String str = requestQueueNameCache.get(remoteInterface);
if (str == null) {
str = "{" + name + ":" + remoteInterface.getName() + "}";
requestQueueNameCache.put(remoteInterface, str);
}
return str;
}
protected ByteBuf encode(Object obj) {
@ -164,17 +168,17 @@ public abstract class BaseRemoteService {
final Class<?> syncInterface) {
// local copy of the options, to prevent mutation
final RemoteInvocationOptions optionsCopy = new RemoteInvocationOptions(options);
final String toString = getClass().getSimpleName() + "-" + remoteInterface.getSimpleName() + "-proxy-"
+ generateRequestId();
InvocationHandler handler = new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
final String requestId = generateRequestId();
if (method.getName().equals("toString")) {
return toString;
return getClass().getSimpleName() + "-" + remoteInterface.getSimpleName() + "-proxy-" + requestId;
} else if (method.getName().equals("equals")) {
return proxy == args[0];
} else if (method.getName().equals("hashCode")) {
return toString.hashCode();
return (getClass().getSimpleName() + "-" + remoteInterface.getSimpleName() + "-proxy-" + requestId).hashCode();
}
if (!optionsCopy.isResultExpected() && !(method.getReturnType().equals(Void.class)
@ -182,13 +186,9 @@ public abstract class BaseRemoteService {
throw new IllegalArgumentException("The noResult option only supports void return value");
}
final String requestId = generateRequestId();
final String requestQueueName = getRequestQueueName(syncInterface);
final String responseName = getResponseQueueName(syncInterface, executorId);
final String ackName = getAckName(syncInterface, requestId);
final RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName, codec);
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName, codec);
final RemoteServiceRequest request = new RemoteServiceRequest(executorId, requestId, method.getName(), getMethodSignatures(method), args,
optionsCopy, System.currentTimeMillis());
@ -204,8 +204,10 @@ public abstract class BaseRemoteService {
return false;
}
if (optionsCopy.isAckExpected()) {
RFuture<Boolean> future = commandExecutor.evalWriteAsync(responseName, LongCodec.INSTANCE,
String ackName = getAckName(requestId);
RFuture<Boolean> future = commandExecutor.evalWriteAsync(responseQueueName, LongCodec.INSTANCE,
RedisCommands.EVAL_BOOLEAN,
"if redis.call('setnx', KEYS[1], 1) == 1 then "
+ "redis.call('pexpire', KEYS[1], ARGV[2]);"
@ -214,7 +216,7 @@ public abstract class BaseRemoteService {
+ "return 1;"
+ "end;"
+ "return 0;",
Arrays.<Object> asList(ackName, responseName, requestQueueName),
Arrays.<Object> asList(ackName, responseQueueName, requestQueueName),
encode(request), request.getOptions().getAckTimeoutInMillis());
boolean ackNotSent = commandExecutor.get(future);
@ -223,19 +225,20 @@ public abstract class BaseRemoteService {
return true;
}
return cancel(syncInterface, requestId, request, mayInterruptIfRunning);
return cancel(requestId, request, mayInterruptIfRunning);
}
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName, codec);
boolean removed = commandExecutor.get(removeAsync(requestQueue, request));
if (removed) {
super.cancel(mayInterruptIfRunning);
return true;
}
return cancel(syncInterface, requestId, request, mayInterruptIfRunning);
return cancel(requestId, request, mayInterruptIfRunning);
}
private boolean cancel(Class<?> remoteInterface, String requestId, RemoteServiceRequest request,
private boolean cancel(String requestId, RemoteServiceRequest request,
boolean mayInterruptIfRunning) {
if (isCancelled()) {
return true;
@ -245,8 +248,7 @@ public abstract class BaseRemoteService {
return false;
}
String canceRequestName = getCancelRequestMapName(remoteInterface);
cancelExecution(optionsCopy, responseName, request, mayInterruptIfRunning, canceRequestName, this);
cancelExecution(optionsCopy, request, mayInterruptIfRunning, this);
try {
awaitUninterruptibly(60, TimeUnit.SECONDS);
@ -268,7 +270,7 @@ public abstract class BaseRemoteService {
}
if (optionsCopy.isAckExpected()) {
RPromise<RemoteServiceAck> ackFuture = poll(optionsCopy.getAckTimeoutInMillis(), request.getId(), responseName);
RPromise<RemoteServiceAck> ackFuture = poll(optionsCopy.getAckTimeoutInMillis(), requestId);
ackFuture.addListener(new FutureListener<RemoteServiceAck>() {
@Override
public void operationComplete(Future<RemoteServiceAck> future) throws Exception {
@ -279,8 +281,9 @@ public abstract class BaseRemoteService {
RemoteServiceAck ack = future.getNow();
if (ack == null) {
final String ackName = getAckName(requestId);
RFuture<RemoteServiceAck> ackFutureAttempt =
tryPollAckAgainAsync(optionsCopy, ackName, request.getId(), responseName);
tryPollAckAgainAsync(optionsCopy, ackName, request.getId());
ackFutureAttempt.addListener(new FutureListener<RemoteServiceAck>() {
@Override
@ -300,17 +303,17 @@ public abstract class BaseRemoteService {
return;
}
awaitResultAsync(optionsCopy, result, request, responseName, ackName);
awaitResultAsync(optionsCopy, result, request, ackName);
}
});
} else {
awaitResultAsync(optionsCopy, result, request, responseName);
awaitResultAsync(optionsCopy, result, request);
}
}
});
} else {
awaitResultAsync(optionsCopy, result, request, responseName);
awaitResultAsync(optionsCopy, result, request);
}
}
});
@ -323,7 +326,7 @@ public abstract class BaseRemoteService {
}
private void awaitResultAsync(final RemoteInvocationOptions optionsCopy, final RemotePromise<Object> result,
final RemoteServiceRequest request, final String responseName, final String ackName) {
final RemoteServiceRequest request, final String ackName) {
RFuture<Boolean> deleteFuture = redisson.getBucket(ackName).deleteAsync();
deleteFuture.addListener(new FutureListener<Boolean>() {
@Override
@ -333,19 +336,20 @@ public abstract class BaseRemoteService {
return;
}
awaitResultAsync(optionsCopy, result, request, responseName);
awaitResultAsync(optionsCopy, result, request);
}
});
}
protected void awaitResultAsync(final RemoteInvocationOptions optionsCopy, final RemotePromise<Object> result,
final RemoteServiceRequest request, final String responseName) {
final RemoteServiceRequest request) {
// poll for the response only if expected
if (!optionsCopy.isResultExpected()) {
return;
}
RPromise<RRemoteServiceResponse> responseFuture = poll(optionsCopy.getExecutionTimeoutInMillis(), request.getId(), responseName);
String requestId = request.getId();
RPromise<RRemoteServiceResponse> responseFuture = poll(optionsCopy.getExecutionTimeoutInMillis(), requestId);
responseFuture.addListener(new FutureListener<RRemoteServiceResponse>() {
@ -358,7 +362,7 @@ public abstract class BaseRemoteService {
if (future.getNow() == null) {
RemoteServiceTimeoutException e = new RemoteServiceTimeoutException("No response after "
+ optionsCopy.getExecutionTimeoutInMillis() + "ms for request: " + request);
+ optionsCopy.getExecutionTimeoutInMillis() + "ms for request: " + requestId);
result.tryFailure(e);
return;
}
@ -380,16 +384,16 @@ public abstract class BaseRemoteService {
}
private <T extends RRemoteServiceResponse> RPromise<T> poll(final long timeout,
final String requestId, final String responseName) {
String requestId) {
final RPromise<T> responseFuture = new RedissonPromise<T>();
final Key key = new Key(requestId);
ResponseEntry entry;
synchronized (responses) {
entry = responses.get(responseName);
entry = responses.get(responseQueueName);
if (entry == null) {
entry = new ResponseEntry();
ResponseEntry oldEntry = responses.putIfAbsent(responseName, entry);
ResponseEntry oldEntry = responses.putIfAbsent(responseQueueName, entry);
if (oldEntry != null) {
entry = oldEntry;
}
@ -403,7 +407,7 @@ public abstract class BaseRemoteService {
@Override
public void run() {
synchronized (responses) {
ResponseEntry entry = responses.get(responseName);
ResponseEntry entry = responses.get(responseQueueName);
if (entry == null) {
return;
}
@ -413,7 +417,7 @@ public abstract class BaseRemoteService {
entry.getTimeouts().remove(key);
entry.getResponses().remove(key, responseFuture);
if (entry.getResponses().isEmpty()) {
responses.remove(responseName, entry);
responses.remove(responseQueueName, entry);
}
}
}
@ -421,30 +425,30 @@ public abstract class BaseRemoteService {
}, timeout, TimeUnit.MILLISECONDS);
entry.getTimeouts().put(key, future);
pollTasks(entry, responseName);
pollTasks(entry);
return responseFuture;
}
private void pollTasks(final ResponseEntry entry, final String responseName) {
private void pollTasks(final ResponseEntry entry) {
if (!entry.getStarted().compareAndSet(false, true)) {
return;
}
final RBlockingQueue<RRemoteServiceResponse> responseQueue = redisson.getBlockingQueue(responseName, codec);
RBlockingQueue<RRemoteServiceResponse> responseQueue = redisson.getBlockingQueue(responseQueueName, codec);
RFuture<RRemoteServiceResponse> future = responseQueue.takeAsync();
future.addListener(new FutureListener<RRemoteServiceResponse>() {
@Override
public void operationComplete(Future<RRemoteServiceResponse> future) throws Exception {
if (!future.isSuccess()) {
log.error("Can't get response from " + responseName, future.cause());
log.error("Can't get response from " + responseQueueName, future.cause());
return;
}
RRemoteServiceResponse response = future.getNow();
RPromise<RRemoteServiceResponse> promise;
synchronized (responses) {
ResponseEntry entry = responses.get(responseName);
ResponseEntry entry = responses.get(responseQueueName);
if (entry == null) {
return;
}
@ -456,8 +460,9 @@ public abstract class BaseRemoteService {
timeoutFuture.cancel(false);
if (entryResponses.isEmpty()) {
responses.remove(responseName, entry);
responses.remove(responseQueueName, entry);
} else {
RBlockingQueue<RRemoteServiceResponse> responseQueue = redisson.getBlockingQueue(responseQueueName, codec);
responseQueue.takeAsync().addListener(this);
}
}
@ -500,13 +505,12 @@ public abstract class BaseRemoteService {
RBlockingQueue<RRemoteServiceResponse> responseQueue = null;
if (optionsCopy.isAckExpected() || optionsCopy.isResultExpected()) {
String responseName = getResponseQueueName(remoteInterface, executorId);
responseQueue = redisson.getBlockingQueue(responseName, codec);
responseQueue = redisson.getBlockingQueue(responseQueueName, codec);
}
// poll for the ack only if expected
if (optionsCopy.isAckExpected()) {
String ackName = getAckName(remoteInterface, requestId);
String ackName = getAckName(requestId);
RemoteServiceAck ack = (RemoteServiceAck) responseQueue.poll(optionsCopy.getAckTimeoutInMillis(),
TimeUnit.MILLISECONDS);
if (ack == null) {
@ -560,8 +564,7 @@ public abstract class BaseRemoteService {
}
private RFuture<RemoteServiceAck> tryPollAckAgainAsync(final RemoteInvocationOptions optionsCopy,
String ackName, final String requestId, final String responseName)
throws InterruptedException {
String ackName, final String requestId) {
final RPromise<RemoteServiceAck> promise = new RedissonPromise<RemoteServiceAck>();
RFuture<Boolean> ackClientsFuture = commandExecutor.evalWriteAsync(ackName, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('setnx', KEYS[1], 1) == 1 then "
@ -580,7 +583,7 @@ public abstract class BaseRemoteService {
}
if (future.getNow()) {
RPromise<RemoteServiceAck> ackFuture = poll(optionsCopy.getAckTimeoutInMillis(), requestId, responseName);
RPromise<RemoteServiceAck> ackFuture = poll(optionsCopy.getAckTimeoutInMillis(), requestId);
ackFuture.addListener(new FutureListener<RemoteServiceAck>() {
@Override
public void operationComplete(Future<RemoteServiceAck> future) throws Exception {
@ -651,9 +654,9 @@ public abstract class BaseRemoteService {
return requestQueue.removeAsync(request);
}
private void cancelExecution(RemoteInvocationOptions optionsCopy, String responseName,
RemoteServiceRequest request, boolean mayInterruptIfRunning, String canceRequestName, RemotePromise<Object> remotePromise) {
RMap<String, RemoteServiceCancelRequest> canceledRequests = redisson.getMap(canceRequestName, codec);
private void cancelExecution(RemoteInvocationOptions optionsCopy,
RemoteServiceRequest request, boolean mayInterruptIfRunning, RemotePromise<Object> remotePromise) {
RMap<String, RemoteServiceCancelRequest> canceledRequests = redisson.getMap(cancelRequestMapName, codec);
canceledRequests.putAsync(request.getId(), new RemoteServiceCancelRequest(mayInterruptIfRunning, false));
canceledRequests.expireAsync(60, TimeUnit.SECONDS);
@ -661,15 +664,23 @@ public abstract class BaseRemoteService {
if (!optionsCopy.isResultExpected()) {
RemoteInvocationOptions options = new RemoteInvocationOptions(optionsCopy);
options.expectResultWithin(60, TimeUnit.SECONDS);
awaitResultAsync(options, remotePromise, request, responseName);
awaitResultAsync(options, remotePromise, request);
}
}
protected List<String> getMethodSignatures(Method method) {
List<String> list = new ArrayList<String>(method.getParameterTypes().length);
for (Class<?> t : method.getParameterTypes()) {
list.add(t.getName());
List<String> result = methodSignaturesCache.get(method);
if (result == null) {
result = new ArrayList<String>(method.getParameterTypes().length);
for (Class<?> t : method.getParameterTypes()) {
result.add(t.getName());
}
List<String> oldList = methodSignaturesCache.putIfAbsent(method, result);
if (oldList != null) {
result = oldList;
}
}
return list;
return result;
}
}

@ -68,7 +68,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
public RedissonRemoteService(Codec codec, RedissonClient redisson, String name, CommandExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, redisson, name, commandExecutor, executorId, responses);
}
@Override
public <T> void register(Class<T> remoteInterface, T object) {
register(remoteInterface, object, 1);
@ -166,11 +166,11 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
return;
}
final String responseName = getResponseQueueName(remoteInterface, request.getExecutorId());
final String responseName = getResponseQueueName(request.getExecutorId());
// send the ack only if expected
if (request.getOptions().isAckExpected()) {
String ackName = getAckName(remoteInterface, request.getId());
String ackName = getAckName(request.getId());
RFuture<Boolean> ackClientsFuture = commandExecutor.evalWriteAsync(responseName,
LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('setnx', KEYS[1], 1) == 1 then "
@ -215,13 +215,13 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
private <T> void executeMethod(final Class<T> remoteInterface, final RBlockingQueue<RemoteServiceRequest> requestQueue,
final ExecutorService executor, final RemoteServiceRequest request) {
final RemoteServiceMethod method = beans.get(new RemoteServiceKey(remoteInterface, request.getMethodName(), request.getSignatures()));
final String responseName = getResponseQueueName(remoteInterface, request.getExecutorId());
final String responseName = getResponseQueueName(request.getExecutorId());
final AtomicReference<RRemoteServiceResponse> responseHolder = new AtomicReference<RRemoteServiceResponse>();
final RPromise<RemoteServiceCancelRequest> cancelRequestFuture = new RedissonPromise<RemoteServiceCancelRequest>();
scheduleCheck(getCancelRequestMapName(remoteInterface), request.getId(), cancelRequestFuture);
scheduleCheck(cancelRequestMapName, request.getId(), cancelRequestFuture);
final java.util.concurrent.Future<?> submitFuture = executor.submit(new Runnable() {
@Override
@ -247,8 +247,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
// could be removed not from future object
if (future.getNow().isSendResponse()) {
String cancelResponseName = getCancelResponseMapName(remoteInterface);
RMap<String, RemoteServiceCancelResponse> map = redisson.getMap(cancelResponseName, codec);
RMap<String, RemoteServiceCancelResponse> map = redisson.getMap(cancelResponseMapName, codec);
map.putAsync(request.getId(), response);
map.expireAsync(60, TimeUnit.SECONDS);
}

@ -115,7 +115,7 @@ public class ScheduledTasksService extends TasksService {
@Override
protected void awaitResultAsync(final RemoteInvocationOptions optionsCopy, final RemotePromise<Object> result,
final RemoteServiceRequest request, final String responseName) {
final RemoteServiceRequest request) {
if (!optionsCopy.isResultExpected()) {
return;
}
@ -129,11 +129,11 @@ public class ScheduledTasksService extends TasksService {
commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() {
@Override
public void run() {
ScheduledTasksService.super.awaitResultAsync(optionsCopy, result, request, responseName);
ScheduledTasksService.super.awaitResultAsync(optionsCopy, result, request);
}
}, (long)(delay - delay*0.10), TimeUnit.MILLISECONDS);
} else {
super.awaitResultAsync(optionsCopy, result, request, responseName);
super.awaitResultAsync(optionsCopy, result, request);
}
}

@ -140,7 +140,6 @@ public class TasksService extends BaseRemoteService {
public RFuture<Boolean> cancelExecutionAsync(final String requestId) {
final Class<?> syncInterface = RemoteExecutorService.class;
String requestQueueName = getRequestQueueName(syncInterface);
if (!redisson.getMap(tasksName, LongCodec.INSTANCE).containsKey(requestId)) {
return RedissonPromise.newSucceededFuture(false);
@ -148,6 +147,7 @@ public class TasksService extends BaseRemoteService {
final RPromise<Boolean> result = new RedissonPromise<Boolean>();
String requestQueueName = getRequestQueueName(syncInterface);
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName, codec);
RemoteServiceRequest request = new RemoteServiceRequest(requestId);
@ -163,13 +163,10 @@ public class TasksService extends BaseRemoteService {
if (future.getNow()) {
result.trySuccess(true);
} else {
String cancelRequestMapName = getCancelRequestMapName(syncInterface);
RMap<String, RemoteServiceCancelRequest> canceledRequests = redisson.getMap(cancelRequestMapName, codec);
canceledRequests.putAsync(requestId, new RemoteServiceCancelRequest(true, true));
canceledRequests.expireAsync(60, TimeUnit.SECONDS);
String cancelResponseMapName = getCancelResponseMapName(syncInterface);
final RPromise<RemoteServiceCancelResponse> response = new RedissonPromise<RemoteServiceCancelResponse>();
scheduleCheck(cancelResponseMapName, requestId, response);
response.addListener(new FutureListener<RemoteServiceCancelResponse>() {

@ -46,11 +46,6 @@ public class RemoteServiceRequest implements Serializable {
this.id = id;
}
public RemoteServiceRequest(String executorId, String id) {
this.id = id;
this.executorId = executorId;
}
public RemoteServiceRequest(String executorId, String id, String methodName, List<String> signatures, Object[] args, RemoteInvocationOptions options, long date) {
super();
this.id = id;

Loading…
Cancel
Save