ScheduledFuture memory allocation optimization. #1158

pull/1204/head
Nikita 7 years ago
parent 4c8f966713
commit 353f9aa24d

@ -22,6 +22,7 @@ import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
@ -34,6 +35,7 @@ import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.redisson.api.RemoteInvocationOptions;
import org.redisson.api.annotation.RRemoteAsync;
import org.redisson.client.RedisException;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
@ -51,6 +53,7 @@ import org.redisson.remote.RemoteServiceResponse;
import org.redisson.remote.RemoteServiceTimeoutException;
import org.redisson.remote.ResponseEntry;
import org.redisson.remote.ResponseEntry.Key;
import org.redisson.remote.ResponseEntry.Result;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -192,6 +195,20 @@ public abstract class BaseRemoteService {
final RemoteServiceRequest request = new RemoteServiceRequest(executorId, requestId, method.getName(), getMethodSignatures(method), args,
optionsCopy, System.currentTimeMillis());
final RFuture<RemoteServiceAck> ackFuture;
if (optionsCopy.isAckExpected()) {
ackFuture = poll(optionsCopy.getAckTimeoutInMillis(), requestId, false);
} else {
ackFuture = null;
}
final RPromise<RRemoteServiceResponse> responseFuture;
if (optionsCopy.isResultExpected()) {
responseFuture = poll(optionsCopy.getExecutionTimeoutInMillis(), requestId, false);
} else {
responseFuture = null;
}
final RemotePromise<Object> result = new RemotePromise<Object>(requestId) {
@Override
@ -248,7 +265,7 @@ public abstract class BaseRemoteService {
return false;
}
cancelExecution(optionsCopy, request, mayInterruptIfRunning, this);
cancelExecution(optionsCopy, request, mayInterruptIfRunning, this, responseFuture);
try {
awaitUninterruptibly(60, TimeUnit.SECONDS);
@ -265,16 +282,36 @@ public abstract class BaseRemoteService {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
if (responseFuture != null) {
responseFuture.cancel(false);
}
if (ackFuture != null) {
ackFuture.cancel(false);
}
result.tryFailure(future.cause());
return;
}
if (!future.get()) {
result.tryFailure(new RedisException("Task hasn't been added"));
if (responseFuture != null) {
responseFuture.cancel(false);
}
if (ackFuture != null) {
ackFuture.cancel(false);
}
return;
}
if (optionsCopy.isAckExpected()) {
RPromise<RemoteServiceAck> ackFuture = poll(optionsCopy.getAckTimeoutInMillis(), requestId);
ackFuture.addListener(new FutureListener<RemoteServiceAck>() {
@Override
public void operationComplete(Future<RemoteServiceAck> future) throws Exception {
if (!future.isSuccess()) {
if (responseFuture != null) {
responseFuture.cancel(false);
}
result.tryFailure(future.cause());
return;
}
@ -303,17 +340,17 @@ public abstract class BaseRemoteService {
return;
}
awaitResultAsync(optionsCopy, result, request, ackName);
awaitResultAsync(optionsCopy, result, request, ackName, responseFuture);
}
});
} else {
awaitResultAsync(optionsCopy, result, request);
awaitResultAsync(optionsCopy, result, request, responseFuture);
}
}
});
} else {
awaitResultAsync(optionsCopy, result, request);
awaitResultAsync(optionsCopy, result, request, responseFuture);
}
}
});
@ -326,7 +363,7 @@ public abstract class BaseRemoteService {
}
private void awaitResultAsync(final RemoteInvocationOptions optionsCopy, final RemotePromise<Object> result,
final RemoteServiceRequest request, final String ackName) {
final RemoteServiceRequest request, final String ackName, final RFuture<RRemoteServiceResponse> responseFuture) {
RFuture<Boolean> deleteFuture = redisson.getBucket(ackName).deleteAsync();
deleteFuture.addListener(new FutureListener<Boolean>() {
@Override
@ -336,21 +373,19 @@ public abstract class BaseRemoteService {
return;
}
awaitResultAsync(optionsCopy, result, request);
awaitResultAsync(optionsCopy, result, request, responseFuture);
}
});
}
protected void awaitResultAsync(final RemoteInvocationOptions optionsCopy, final RemotePromise<Object> result,
final RemoteServiceRequest request) {
final RemoteServiceRequest request, RFuture<RRemoteServiceResponse> responseFuture) {
// poll for the response only if expected
if (!optionsCopy.isResultExpected()) {
return;
}
String requestId = request.getId();
RPromise<RRemoteServiceResponse> responseFuture = poll(optionsCopy.getExecutionTimeoutInMillis(), requestId);
final String requestId = request.getId();
responseFuture.addListener(new FutureListener<RRemoteServiceResponse>() {
@Override
@ -384,9 +419,9 @@ public abstract class BaseRemoteService {
}
private <T extends RRemoteServiceResponse> RPromise<T> poll(final long timeout,
String requestId) {
final RPromise<T> responseFuture = new RedissonPromise<T>();
String requestId, boolean insertFirst) {
final Key key = new Key(requestId);
final RPromise<T> responseFuture = new RedissonPromise<T>();
ResponseEntry entry;
synchronized (responses) {
@ -399,31 +434,69 @@ public abstract class BaseRemoteService {
}
}
final ConcurrentMap<Key, RPromise<? extends RRemoteServiceResponse>> entryResponses = entry.getResponses();
entryResponses.put(key, responseFuture);
}
responseFuture.addListener(new FutureListener<T>() {
@Override
public void operationComplete(Future<T> future) throws Exception {
if (future.isCancelled()) {
synchronized (responses) {
ResponseEntry entry = responses.get(responseQueueName);
List<Result> list = entry.getResponses().get(key);
for (Iterator<Result> iterator = list.iterator(); iterator.hasNext();) {
Result result = iterator.next();
if (result.getPromise() == responseFuture) {
result.getScheduledFuture().cancel(true);
iterator.remove();
}
}
if (list.isEmpty()) {
entry.getResponses().remove(key);
}
ScheduledFuture<?> future = commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() {
@Override
public void run() {
synchronized (responses) {
ResponseEntry entry = responses.get(responseQueueName);
if (entry == null) {
return;
if (entry.getResponses().isEmpty()) {
responses.remove(responseQueueName, entry);
}
}
}
RemoteServiceTimeoutException ex = new RemoteServiceTimeoutException("No response after " + timeout + "ms");
if (responseFuture.tryFailure(ex)) {
entry.getTimeouts().remove(key);
entry.getResponses().remove(key, responseFuture);
if (entry.getResponses().isEmpty()) {
responses.remove(responseQueueName, entry);
}
});
ScheduledFuture<?> future = commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() {
@Override
public void run() {
synchronized (responses) {
ResponseEntry entry = responses.get(responseQueueName);
if (entry == null) {
return;
}
RemoteServiceTimeoutException ex = new RemoteServiceTimeoutException("No response after " + timeout + "ms");
if (responseFuture.tryFailure(ex)) {
List<Result> list = entry.getResponses().get(key);
list.remove(0);
if (list.isEmpty()) {
responses.remove(responseQueueName, entry);
}
}
}
}
}, timeout, TimeUnit.MILLISECONDS);
final Map<Key, List<Result>> entryResponses = entry.getResponses();
List<Result> list = entryResponses.get(key);
if (list == null) {
list = new ArrayList<Result>();
entryResponses.put(key, list);
}
Result res = new Result(responseFuture, future);
if (insertFirst) {
list.add(0, res);
} else {
list.add(res);
}
}, timeout, TimeUnit.MILLISECONDS);
entry.getTimeouts().put(key, future);
}
pollTasks(entry);
return responseFuture;
@ -450,16 +523,21 @@ public abstract class BaseRemoteService {
synchronized (responses) {
ResponseEntry entry = responses.get(responseQueueName);
if (entry == null) {
log.error("Can't find entry " + responseQueueName);
return;
}
Key key = new Key(response.getId());
ConcurrentMap<Key, RPromise<? extends RRemoteServiceResponse>> entryResponses = entry.getResponses();
promise = (RPromise<RRemoteServiceResponse>) entryResponses.remove(key);
java.util.concurrent.ScheduledFuture<?> timeoutFuture = entry.getTimeouts().remove(key);
timeoutFuture.cancel(false);
List<Result> list = entry.getResponses().get(key);
Result res = list.remove(0);
if (list.isEmpty()) {
entry.getResponses().remove(key);
}
promise = res.getPromise();
res.getScheduledFuture().cancel(true);
if (entryResponses.isEmpty()) {
if (entry.getResponses().isEmpty()) {
responses.remove(responseQueueName, entry);
} else {
RBlockingQueue<RRemoteServiceResponse> responseQueue = redisson.getBlockingQueue(responseQueueName, codec);
@ -470,7 +548,6 @@ public abstract class BaseRemoteService {
if (promise != null) {
promise.trySuccess(response);
}
}
});
}
@ -583,7 +660,7 @@ public abstract class BaseRemoteService {
}
if (future.getNow()) {
RPromise<RemoteServiceAck> ackFuture = poll(optionsCopy.getAckTimeoutInMillis(), requestId);
RPromise<RemoteServiceAck> ackFuture = poll(commandExecutor.getConnectionManager().getConfig().getTimeout(), requestId, true);
ackFuture.addListener(new FutureListener<RemoteServiceAck>() {
@Override
public void operationComplete(Future<RemoteServiceAck> future) throws Exception {
@ -655,7 +732,7 @@ public abstract class BaseRemoteService {
}
private void cancelExecution(RemoteInvocationOptions optionsCopy,
RemoteServiceRequest request, boolean mayInterruptIfRunning, RemotePromise<Object> remotePromise) {
RemoteServiceRequest request, boolean mayInterruptIfRunning, RemotePromise<Object> remotePromise, RFuture<RRemoteServiceResponse> responseFuture) {
RMap<String, RemoteServiceCancelRequest> canceledRequests = redisson.getMap(cancelRequestMapName, codec);
canceledRequests.putAsync(request.getId(), new RemoteServiceCancelRequest(mayInterruptIfRunning, false));
canceledRequests.expireAsync(60, TimeUnit.SECONDS);
@ -664,7 +741,7 @@ public abstract class BaseRemoteService {
if (!optionsCopy.isResultExpected()) {
RemoteInvocationOptions options = new RemoteInvocationOptions(optionsCopy);
options.expectResultWithin(60, TimeUnit.SECONDS);
awaitResultAsync(options, remotePromise, request);
awaitResultAsync(options, remotePromise, request, responseFuture);
}
}

@ -28,6 +28,7 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.remote.RRemoteServiceResponse;
import org.redisson.remote.RemoteServiceRequest;
import org.redisson.remote.ResponseEntry;
@ -115,7 +116,7 @@ public class ScheduledTasksService extends TasksService {
@Override
protected void awaitResultAsync(final RemoteInvocationOptions optionsCopy, final RemotePromise<Object> result,
final RemoteServiceRequest request) {
final RemoteServiceRequest request, final RFuture<RRemoteServiceResponse> responseFuture) {
if (!optionsCopy.isResultExpected()) {
return;
}
@ -129,11 +130,11 @@ public class ScheduledTasksService extends TasksService {
commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() {
@Override
public void run() {
ScheduledTasksService.super.awaitResultAsync(optionsCopy, result, request);
ScheduledTasksService.super.awaitResultAsync(optionsCopy, result, request, responseFuture);
}
}, (long)(delay - delay*0.10), TimeUnit.MILLISECONDS);
} else {
super.awaitResultAsync(optionsCopy, result, request);
super.awaitResultAsync(optionsCopy, result, request, responseFuture);
}
}

@ -15,7 +15,9 @@
*/
package org.redisson.remote;
import java.util.concurrent.ConcurrentMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
@ -24,7 +26,6 @@ import org.redisson.misc.RPromise;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.util.internal.PlatformDependent;
/**
*
@ -76,15 +77,31 @@ public class ResponseEntry {
}
private final ConcurrentMap<Key, RPromise<? extends RRemoteServiceResponse>> responses = PlatformDependent.newConcurrentHashMap();
private final ConcurrentMap<Key, ScheduledFuture<?>> timeouts = PlatformDependent.newConcurrentHashMap();
private final AtomicBoolean started = new AtomicBoolean();
public ConcurrentMap<Key, ScheduledFuture<?>> getTimeouts() {
return timeouts;
public static class Result {
private final RPromise<? extends RRemoteServiceResponse> promise;
private final ScheduledFuture<?> scheduledFuture;
public Result(RPromise<? extends RRemoteServiceResponse> promise, ScheduledFuture<?> scheduledFuture) {
super();
this.promise = promise;
this.scheduledFuture = scheduledFuture;
}
public <T extends RRemoteServiceResponse> RPromise<T> getPromise() {
return (RPromise<T>) promise;
}
public ScheduledFuture<?> getScheduledFuture() {
return scheduledFuture;
}
}
public ConcurrentMap<Key, RPromise<? extends RRemoteServiceResponse>> getResponses() {
private final Map<Key, List<Result>> responses = new HashMap<Key, List<Result>>();
private final AtomicBoolean started = new AtomicBoolean();
public Map<Key, List<Result>> getResponses() {
return responses;
}

Loading…
Cancel
Save