Scheduled tasks memory consumption reduced. #1158

pull/1204/head
Nikita 7 years ago
parent 6d39a91a55
commit a5c26a13dc

@ -51,8 +51,8 @@ import org.redisson.remote.RemoteServiceCancelResponse;
import org.redisson.remote.RemoteServiceRequest;
import org.redisson.remote.RemoteServiceResponse;
import org.redisson.remote.RemoteServiceTimeoutException;
import org.redisson.remote.RequestId;
import org.redisson.remote.ResponseEntry;
import org.redisson.remote.ResponseEntry.Key;
import org.redisson.remote.ResponseEntry.Result;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -106,10 +106,15 @@ public abstract class BaseRemoteService {
return "{remote_response}:" + executorId;
}
protected String getAckName(RequestId requestId) {
return "{" + name + ":remote" + "}:" + requestId + ":ack";
}
protected String getAckName(String requestId) {
return "{" + name + ":remote" + "}:" + requestId + ":ack";
}
public String getRequestQueueName(Class<?> remoteInterface) {
String str = requestQueueNameCache.get(remoteInterface);
if (str == null) {
@ -174,7 +179,7 @@ public abstract class BaseRemoteService {
InvocationHandler handler = new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
final String requestId = generateRequestId();
final RequestId requestId = generateRequestId();
if (method.getName().equals("toString")) {
return getClass().getSimpleName() + "-" + remoteInterface.getSimpleName() + "-proxy-" + requestId;
@ -191,10 +196,6 @@ public abstract class BaseRemoteService {
final String requestQueueName = getRequestQueueName(syncInterface);
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName, codec);
final RemoteServiceRequest request = new RemoteServiceRequest(executorId, requestId, method.getName(), getMethodSignatures(method), args,
optionsCopy, System.currentTimeMillis());
final RFuture<RemoteServiceAck> ackFuture;
if (optionsCopy.isAckExpected()) {
ackFuture = poll(optionsCopy.getAckTimeoutInMillis(), requestId, false);
@ -209,7 +210,12 @@ public abstract class BaseRemoteService {
responseFuture = null;
}
final RemotePromise<Object> result = new RemotePromise<Object>(requestId) {
RemoteServiceRequest request = new RemoteServiceRequest(executorId, requestId.toString(), method.getName(), getMethodSignatures(method), args,
optionsCopy, System.currentTimeMillis());
final Long ackTimeout = request.getOptions().getAckTimeoutInMillis();
final RemotePromise<Object> result = new RemotePromise<Object>(requestId, getParam(request)) {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
@ -234,7 +240,7 @@ public abstract class BaseRemoteService {
+ "end;"
+ "return 0;",
Arrays.<Object> asList(ackName, responseQueueName, requestQueueName),
encode(request), request.getOptions().getAckTimeoutInMillis());
requestId, ackTimeout);
boolean ackNotSent = commandExecutor.get(future);
if (ackNotSent) {
@ -242,21 +248,19 @@ public abstract class BaseRemoteService {
return true;
}
return cancel(requestId, request, mayInterruptIfRunning);
return doCancel(mayInterruptIfRunning);
}
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName, codec);
boolean removed = commandExecutor.get(removeAsync(requestQueue, request));
boolean removed = commandExecutor.get(removeAsync(requestQueueName, requestId));
if (removed) {
super.cancel(mayInterruptIfRunning);
return true;
}
return cancel(requestId, request, mayInterruptIfRunning);
return doCancel(mayInterruptIfRunning);
}
private boolean cancel(String requestId, RemoteServiceRequest request,
boolean mayInterruptIfRunning) {
private boolean doCancel(boolean mayInterruptIfRunning) {
if (isCancelled()) {
return true;
}
@ -265,7 +269,7 @@ public abstract class BaseRemoteService {
return false;
}
cancelExecution(optionsCopy, request, mayInterruptIfRunning, this, responseFuture);
cancelExecution(optionsCopy, mayInterruptIfRunning, this, responseFuture);
try {
awaitUninterruptibly(60, TimeUnit.SECONDS);
@ -276,7 +280,7 @@ public abstract class BaseRemoteService {
}
};
RFuture<Boolean> addFuture = addAsync(requestQueue, request, result);
RFuture<Boolean> addFuture = addAsync(requestQueueName, request, result);
addFuture.addListener(new FutureListener<Boolean>() {
@Override
@ -320,7 +324,7 @@ public abstract class BaseRemoteService {
if (ack == null) {
final String ackName = getAckName(requestId);
RFuture<RemoteServiceAck> ackFutureAttempt =
tryPollAckAgainAsync(optionsCopy, ackName, request.getId());
tryPollAckAgainAsync(optionsCopy, ackName, requestId);
ackFutureAttempt.addListener(new FutureListener<RemoteServiceAck>() {
@Override
@ -335,22 +339,22 @@ public abstract class BaseRemoteService {
Exception ex = new RemoteServiceAckTimeoutException(
"No ACK response after "
+ optionsCopy.getAckTimeoutInMillis()
+ "ms for request: " + request);
+ "ms for request: " + requestId);
result.tryFailure(ex);
return;
}
awaitResultAsync(optionsCopy, result, request, ackName, responseFuture);
awaitResultAsync(optionsCopy, result, ackName, responseFuture);
}
});
} else {
awaitResultAsync(optionsCopy, result, request, responseFuture);
awaitResultAsync(optionsCopy, result, responseFuture);
}
}
});
} else {
awaitResultAsync(optionsCopy, result, request, responseFuture);
awaitResultAsync(optionsCopy, result, responseFuture);
}
}
});
@ -362,8 +366,12 @@ public abstract class BaseRemoteService {
return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[] { remoteInterface }, handler);
}
protected Object getParam(RemoteServiceRequest request) {
return null;
}
private void awaitResultAsync(final RemoteInvocationOptions optionsCopy, final RemotePromise<Object> result,
final RemoteServiceRequest request, final String ackName, final RFuture<RRemoteServiceResponse> responseFuture) {
final String ackName, final RFuture<RRemoteServiceResponse> responseFuture) {
RFuture<Boolean> deleteFuture = redisson.getBucket(ackName).deleteAsync();
deleteFuture.addListener(new FutureListener<Boolean>() {
@Override
@ -373,19 +381,18 @@ public abstract class BaseRemoteService {
return;
}
awaitResultAsync(optionsCopy, result, request, responseFuture);
awaitResultAsync(optionsCopy, result, responseFuture);
}
});
}
protected void awaitResultAsync(final RemoteInvocationOptions optionsCopy, final RemotePromise<Object> result,
final RemoteServiceRequest request, RFuture<RRemoteServiceResponse> responseFuture) {
RFuture<RRemoteServiceResponse> responseFuture) {
// poll for the response only if expected
if (!optionsCopy.isResultExpected()) {
return;
}
final String requestId = request.getId();
responseFuture.addListener(new FutureListener<RRemoteServiceResponse>() {
@Override
@ -397,7 +404,7 @@ public abstract class BaseRemoteService {
if (future.getNow() == null) {
RemoteServiceTimeoutException e = new RemoteServiceTimeoutException("No response after "
+ optionsCopy.getExecutionTimeoutInMillis() + "ms for request: " + requestId);
+ optionsCopy.getExecutionTimeoutInMillis() + "ms for request: " + result.getRequestId());
result.tryFailure(e);
return;
}
@ -419,8 +426,7 @@ public abstract class BaseRemoteService {
}
private <T extends RRemoteServiceResponse> RPromise<T> poll(final long timeout,
String requestId, boolean insertFirst) {
final Key key = new Key(requestId);
final RequestId requestId, boolean insertFirst) {
final RPromise<T> responseFuture = new RedissonPromise<T>();
ResponseEntry entry;
@ -440,7 +446,7 @@ public abstract class BaseRemoteService {
if (future.isCancelled()) {
synchronized (responses) {
ResponseEntry entry = responses.get(responseQueueName);
List<Result> list = entry.getResponses().get(key);
List<Result> list = entry.getResponses().get(requestId);
for (Iterator<Result> iterator = list.iterator(); iterator.hasNext();) {
Result result = iterator.next();
if (result.getPromise() == responseFuture) {
@ -449,7 +455,7 @@ public abstract class BaseRemoteService {
}
}
if (list.isEmpty()) {
entry.getResponses().remove(key);
entry.getResponses().remove(requestId);
}
if (entry.getResponses().isEmpty()) {
@ -472,9 +478,12 @@ public abstract class BaseRemoteService {
RemoteServiceTimeoutException ex = new RemoteServiceTimeoutException("No response after " + timeout + "ms");
if (responseFuture.tryFailure(ex)) {
List<Result> list = entry.getResponses().get(key);
List<Result> list = entry.getResponses().get(requestId);
list.remove(0);
if (list.isEmpty()) {
entry.getResponses().remove(requestId);
}
if (entry.getResponses().isEmpty()) {
responses.remove(responseQueueName, entry);
}
}
@ -482,11 +491,11 @@ public abstract class BaseRemoteService {
}
}, timeout, TimeUnit.MILLISECONDS);
final Map<Key, List<Result>> entryResponses = entry.getResponses();
List<Result> list = entryResponses.get(key);
final Map<RequestId, List<Result>> entryResponses = entry.getResponses();
List<Result> list = entryResponses.get(requestId);
if (list == null) {
list = new ArrayList<Result>();
entryResponses.put(key, list);
list = new ArrayList<Result>(3);
entryResponses.put(requestId, list);
}
Result res = new Result(responseFuture, future);
@ -523,12 +532,17 @@ public abstract class BaseRemoteService {
synchronized (responses) {
ResponseEntry entry = responses.get(responseQueueName);
if (entry == null) {
log.error("Can't find entry " + responseQueueName);
return;
}
Key key = new Key(response.getId());
RequestId key = new RequestId(response.getId());
List<Result> list = entry.getResponses().get(key);
if (list == null) {
RBlockingQueue<RRemoteServiceResponse> responseQueue = redisson.getBlockingQueue(responseQueueName, codec);
responseQueue.takeAsync().addListener(this);
return;
}
Result res = list.remove(0);
if (list.isEmpty()) {
entry.getResponses().remove(key);
@ -572,13 +586,15 @@ public abstract class BaseRemoteService {
&& !(method.getReturnType().equals(Void.class) || method.getReturnType().equals(Void.TYPE)))
throw new IllegalArgumentException("The noResult option only supports void return value");
String requestId = generateRequestId();
RequestId requestId = generateRequestId();
String requestQueueName = getRequestQueueName(remoteInterface);
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName, codec);
RemoteServiceRequest request = new RemoteServiceRequest(executorId, requestId, method.getName(), getMethodSignatures(method), args, optionsCopy,
RemoteServiceRequest request = new RemoteServiceRequest(executorId, requestId.toString(), method.getName(), getMethodSignatures(method), args, optionsCopy,
System.currentTimeMillis());
requestQueue.add(request);
RemotePromise<Object> addPromise = new RemotePromise<Object>(requestId, null);
addAsync(requestQueueName, request, addPromise);
addPromise.getAddFuture().sync();
RBlockingQueue<RRemoteServiceResponse> responseQueue = null;
if (optionsCopy.isAckExpected() || optionsCopy.isResultExpected()) {
@ -641,7 +657,7 @@ public abstract class BaseRemoteService {
}
private RFuture<RemoteServiceAck> tryPollAckAgainAsync(final RemoteInvocationOptions optionsCopy,
String ackName, final String requestId) {
String ackName, final RequestId requestId) {
final RPromise<RemoteServiceAck> promise = new RedissonPromise<RemoteServiceAck>();
RFuture<Boolean> ackClientsFuture = commandExecutor.evalWriteAsync(ackName, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('setnx', KEYS[1], 1) == 1 then "
@ -680,7 +696,7 @@ public abstract class BaseRemoteService {
return promise;
}
protected <T> void scheduleCheck(final String mapName, final String requestId, final RPromise<T> cancelRequest) {
protected <T> void scheduleCheck(final String mapName, final RequestId requestId, final RPromise<T> cancelRequest) {
commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
@ -689,7 +705,7 @@ public abstract class BaseRemoteService {
}
RMap<String, T> canceledRequests = redisson.getMap(mapName, codec);
RFuture<T> future = canceledRequests.getAsync(requestId);
RFuture<T> future = canceledRequests.getAsync(requestId.toString());
future.addListener(new FutureListener<T>() {
@Override
public void operationComplete(Future<T> future) throws Exception {
@ -713,35 +729,47 @@ public abstract class BaseRemoteService {
}, 3000, TimeUnit.MILLISECONDS);
}
protected String generateRequestId() {
protected RequestId generateRequestId() {
byte[] id = new byte[16];
// TODO JDK UPGRADE replace to native ThreadLocalRandom
ThreadLocalRandom.current().nextBytes(id);
return ByteBufUtil.hexDump(id);
return new RequestId(id);
}
protected RFuture<Boolean> addAsync(RBlockingQueue<RemoteServiceRequest> requestQueue, RemoteServiceRequest request,
protected RFuture<Boolean> addAsync(String requestQueueName, RemoteServiceRequest request,
RemotePromise<Object> result) {
RFuture<Boolean> future = requestQueue.addAsync(request);
RFuture<Boolean> future = commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('hset', KEYS[2], ARGV[1], ARGV[2]);"
+ "redis.call('rpush', KEYS[1], ARGV[1]); "
+ "return 1;",
Arrays.<Object>asList(requestQueueName, requestQueueName + ":tasks"),
request.getId(), encode(request));
result.setAddFuture(future);
return future;
}
protected RFuture<Boolean> removeAsync(RBlockingQueue<RemoteServiceRequest> requestQueue, RemoteServiceRequest request) {
return requestQueue.removeAsync(request);
protected RFuture<Boolean> removeAsync(String requestQueueName, RequestId taskId) {
return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('lrem', KEYS[1], 1, ARGV[1]); "
+ "redis.call('hset', KEYS[2], ARGV[1]);"
+ "return 1;",
Arrays.<Object>asList(requestQueueName, requestQueueName + ":tasks"),
taskId.toString());
}
private void cancelExecution(RemoteInvocationOptions optionsCopy,
RemoteServiceRequest request, boolean mayInterruptIfRunning, RemotePromise<Object> remotePromise, RFuture<RRemoteServiceResponse> responseFuture) {
boolean mayInterruptIfRunning, RemotePromise<Object> remotePromise, RFuture<RRemoteServiceResponse> responseFuture) {
RMap<String, RemoteServiceCancelRequest> canceledRequests = redisson.getMap(cancelRequestMapName, codec);
canceledRequests.putAsync(request.getId(), new RemoteServiceCancelRequest(mayInterruptIfRunning, false));
canceledRequests.putAsync(remotePromise.getRequestId().toString(), new RemoteServiceCancelRequest(mayInterruptIfRunning, false));
canceledRequests.expireAsync(60, TimeUnit.SECONDS);
// subscribe for async result if it's not expected before
if (!optionsCopy.isResultExpected()) {
RemoteInvocationOptions options = new RemoteInvocationOptions(optionsCopy);
options.expectResultWithin(60, TimeUnit.SECONDS);
awaitResultAsync(options, remotePromise, request, responseFuture);
responseFuture = poll(options.getExecutionTimeoutInMillis(), remotePromise.getRequestId(), false);
awaitResultAsync(options, remotePromise, responseFuture);
}
}

@ -69,6 +69,7 @@ import org.redisson.misc.Injector;
import org.redisson.misc.PromiseDelegator;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.remote.RequestId;
import org.redisson.remote.ResponseEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -221,8 +222,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
"local expiredTaskIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
+ "if #expiredTaskIds > 0 then "
+ "redis.call('zrem', KEYS[2], unpack(expiredTaskIds));"
+ "local expiredTasks = redis.call('hmget', KEYS[3], unpack(expiredTaskIds));"
+ "redis.call('rpush', KEYS[1], unpack(expiredTasks));"
+ "redis.call('rpush', KEYS[1], unpack(expiredTaskIds));"
+ "end; "
// get startTime from scheduler queue head task
+ "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "
@ -230,7 +230,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
+ "return v[2]; "
+ "end "
+ "return nil;",
Arrays.<Object>asList(requestQueueName, schedulerQueueName, tasksName),
Arrays.<Object>asList(requestQueueName, schedulerQueueName),
System.currentTimeMillis(), 100);
}
};
@ -769,7 +769,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public boolean cancelTask(String taskId) {
RFuture<Boolean> scheduledFuture = scheduledRemoteService.cancelExecutionAsync(taskId);
RFuture<Boolean> scheduledFuture = scheduledRemoteService.cancelExecutionAsync(new RequestId(taskId));
return commandExecutor.get(scheduledFuture);
}

@ -33,7 +33,9 @@ import org.redisson.api.RRemoteService;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.codec.CompositeCodec;
import org.redisson.command.CommandExecutor;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
@ -45,6 +47,7 @@ import org.redisson.remote.RemoteServiceKey;
import org.redisson.remote.RemoteServiceMethod;
import org.redisson.remote.RemoteServiceRequest;
import org.redisson.remote.RemoteServiceResponse;
import org.redisson.remote.RequestId;
import org.redisson.remote.ResponseEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -63,7 +66,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
private static final Logger log = LoggerFactory.getLogger(RedissonRemoteService.class);
private final Map<RemoteServiceKey, RemoteServiceMethod> beans = PlatformDependent.newConcurrentHashMap();
private final Map<Class<?>, Set<RFuture<RemoteServiceRequest>>> futures = PlatformDependent.newConcurrentHashMap();
private final Map<Class<?>, Set<RFuture<String>>> futures = PlatformDependent.newConcurrentHashMap();
public RedissonRemoteService(Codec codec, RedissonClient redisson, String name, CommandExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, redisson, name, commandExecutor, executorId, responses);
@ -81,19 +84,19 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
beans.remove(key);
}
Set<RFuture<RemoteServiceRequest>> removedFutures = futures.remove(remoteInterface);
Set<RFuture<String>> removedFutures = futures.remove(remoteInterface);
if (removedFutures == null) {
return;
}
for (RFuture<RemoteServiceRequest> future : removedFutures) {
for (RFuture<String> future : removedFutures) {
future.cancel(false);
}
}
@Override
public int getFreeWorkers(Class<?> remoteInterface) {
Set<RFuture<RemoteServiceRequest>> futuresSet = futures.get(remoteInterface);
Set<RFuture<String>> futuresSet = futures.get(remoteInterface);
return futuresSet.size();
}
@ -115,28 +118,28 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
}
}
Set<RFuture<RemoteServiceRequest>> values = Collections.newSetFromMap(PlatformDependent.<RFuture<RemoteServiceRequest>, Boolean>newConcurrentHashMap());
Set<RFuture<String>> values = Collections.newSetFromMap(PlatformDependent.<RFuture<String>, Boolean>newConcurrentHashMap());
futures.put(remoteInterface, values);
String requestQueueName = getRequestQueueName(remoteInterface);
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName, codec);
RBlockingQueue<String> requestQueue = redisson.getBlockingQueue(requestQueueName, StringCodec.INSTANCE);
for (int i = 0; i < workers; i++) {
subscribe(remoteInterface, requestQueue, executor);
}
}
private <T> void subscribe(final Class<T> remoteInterface, final RBlockingQueue<RemoteServiceRequest> requestQueue,
private <T> void subscribe(final Class<T> remoteInterface, final RBlockingQueue<String> requestQueue,
final ExecutorService executor) {
Set<RFuture<RemoteServiceRequest>> futuresSet = futures.get(remoteInterface);
Set<RFuture<String>> futuresSet = futures.get(remoteInterface);
if (futuresSet == null) {
return;
}
final RFuture<RemoteServiceRequest> take = requestQueue.takeAsync();
final RFuture<String> take = requestQueue.takeAsync();
futuresSet.add(take);
take.addListener(new FutureListener<RemoteServiceRequest>() {
take.addListener(new FutureListener<String>() {
@Override
public void operationComplete(Future<RemoteServiceRequest> future) throws Exception {
Set<RFuture<RemoteServiceRequest>> futuresSet = futures.get(remoteInterface);
public void operationComplete(Future<String> future) throws Exception {
Set<RFuture<String>> futuresSet = futures.get(remoteInterface);
if (futuresSet == null) {
return;
}
@ -156,63 +159,83 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
// https://github.com/mrniko/redisson/issues/493
// subscribe(remoteInterface, requestQueue);
final RemoteServiceRequest request = future.getNow();
// check the ack only if expected
if (request.getOptions().isAckExpected() && System.currentTimeMillis() - request.getDate() > request
.getOptions().getAckTimeoutInMillis()) {
log.debug("request: {} has been skipped due to ackTimeout");
// re-subscribe after a skipped ackTimeout
subscribe(remoteInterface, requestQueue, executor);
return;
}
final String requestId = future.getNow();
RMap<String, RemoteServiceRequest> tasks = redisson.getMap(requestQueue.getName() + ":tasks", new CompositeCodec(StringCodec.INSTANCE, codec, codec));
RFuture<RemoteServiceRequest> taskFuture = tasks.getAsync(requestId);
taskFuture.addListener(new FutureListener<RemoteServiceRequest>() {
@Override
public void operationComplete(Future<RemoteServiceRequest> future) throws Exception {
if (!future.isSuccess()) {
if (future.cause() instanceof RedissonShutdownException) {
return;
}
log.error("Can't process the remote service request with id " + requestId, future.cause());
// re-subscribe after a failed takeAsync
subscribe(remoteInterface, requestQueue, executor);
return;
}
final RemoteServiceRequest request = future.getNow();
// check the ack only if expected
if (request.getOptions().isAckExpected() && System.currentTimeMillis() - request.getDate() > request
.getOptions().getAckTimeoutInMillis()) {
log.debug("request: {} has been skipped due to ackTimeout");
// re-subscribe after a skipped ackTimeout
subscribe(remoteInterface, requestQueue, executor);
return;
}
final String responseName = getResponseQueueName(request.getExecutorId());
// send the ack only if expected
if (request.getOptions().isAckExpected()) {
String ackName = getAckName(request.getId());
RFuture<Boolean> ackClientsFuture = commandExecutor.evalWriteAsync(responseName,
LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('setnx', KEYS[1], 1) == 1 then "
+ "redis.call('pexpire', KEYS[1], ARGV[2]);"
+ "redis.call('rpush', KEYS[2], ARGV[1]);"
// + "redis.call('pexpire', KEYS[2], ARGV[2]);"
+ "return 1;"
+ "end;"
+ "return 0;",
Arrays.<Object>asList(ackName, responseName),
encode(new RemoteServiceAck(request.getId())), request.getOptions().getAckTimeoutInMillis());
ackClientsFuture.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
if (future.cause() instanceof RedissonShutdownException) {
return;
final String responseName = getResponseQueueName(request.getExecutorId());
// send the ack only if expected
if (request.getOptions().isAckExpected()) {
String ackName = getAckName(request.getId());
RFuture<Boolean> ackClientsFuture = commandExecutor.evalWriteAsync(responseName,
LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('setnx', KEYS[1], 1) == 1 then "
+ "redis.call('pexpire', KEYS[1], ARGV[2]);"
+ "redis.call('rpush', KEYS[2], ARGV[1]);"
// + "redis.call('pexpire', KEYS[2], ARGV[2]);"
+ "return 1;"
+ "end;"
+ "return 0;",
Arrays.<Object>asList(ackName, responseName),
encode(new RemoteServiceAck(request.getId())), request.getOptions().getAckTimeoutInMillis());
ackClientsFuture.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
if (future.cause() instanceof RedissonShutdownException) {
return;
}
log.error("Can't send ack for request: " + request, future.cause());
// re-subscribe after a failed send (ack)
subscribe(remoteInterface, requestQueue, executor);
return;
}
if (!future.getNow()) {
subscribe(remoteInterface, requestQueue, executor);
return;
}
executeMethod(remoteInterface, requestQueue, executor, request);
}
log.error("Can't send ack for request: " + request, future.cause());
// re-subscribe after a failed send (ack)
subscribe(remoteInterface, requestQueue, executor);
return;
}
if (!future.getNow()) {
subscribe(remoteInterface, requestQueue, executor);
return;
}
executeMethod(remoteInterface, requestQueue, executor, request);
}
});
} else {
executeMethod(remoteInterface, requestQueue, executor, request);
}
});
} else {
executeMethod(remoteInterface, requestQueue, executor, request);
}
}
});
}
});
}
private <T> void executeMethod(final Class<T> remoteInterface, final RBlockingQueue<RemoteServiceRequest> requestQueue,
private <T> void executeMethod(final Class<T> remoteInterface, final RBlockingQueue<String> requestQueue,
final ExecutorService executor, final RemoteServiceRequest request) {
final RemoteServiceMethod method = beans.get(new RemoteServiceKey(remoteInterface, request.getMethodName(), request.getSignatures()));
final String responseName = getResponseQueueName(request.getExecutorId());
@ -221,7 +244,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
final AtomicReference<RRemoteServiceResponse> responseHolder = new AtomicReference<RRemoteServiceResponse>();
final RPromise<RemoteServiceCancelRequest> cancelRequestFuture = new RedissonPromise<RemoteServiceCancelRequest>();
scheduleCheck(cancelRequestMapName, request.getId(), cancelRequestFuture);
scheduleCheck(cancelRequestMapName, new RequestId(request.getId()), cancelRequestFuture);
final java.util.concurrent.Future<?> submitFuture = executor.submit(new Runnable() {
@Override
@ -257,7 +280,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
}
private <T> void invokeMethod(final Class<T> remoteInterface,
final RBlockingQueue<RemoteServiceRequest> requestQueue, final RemoteServiceRequest request,
final RBlockingQueue<String> requestQueue, final RemoteServiceRequest request,
RemoteServiceMethod method, String responseName, final ExecutorService executor,
RFuture<RemoteServiceCancelRequest> cancelRequestFuture, final AtomicReference<RRemoteServiceResponse> responseHolder) {
try {

@ -18,6 +18,7 @@ package org.redisson.executor;
import org.redisson.api.RExecutorFuture;
import org.redisson.misc.PromiseDelegator;
import org.redisson.misc.RPromise;
import org.redisson.remote.RequestId;
/**
*
@ -27,16 +28,16 @@ import org.redisson.misc.RPromise;
*/
public class RedissonExecutorFuture<V> extends PromiseDelegator<V> implements RExecutorFuture<V> {
private final String taskId;
private final RequestId taskId;
public RedissonExecutorFuture(RPromise<V> promise, String taskId) {
public RedissonExecutorFuture(RPromise<V> promise, RequestId taskId) {
super(promise);
this.taskId = taskId;
}
@Override
public String getTaskId() {
return taskId;
return taskId.toString();
}
}

@ -20,6 +20,7 @@ import java.util.concurrent.TimeUnit;
import org.redisson.api.RScheduledFuture;
import org.redisson.misc.PromiseDelegator;
import org.redisson.remote.RequestId;
/**
*
@ -30,7 +31,7 @@ import org.redisson.misc.PromiseDelegator;
public class RedissonScheduledFuture<V> extends PromiseDelegator<V> implements RScheduledFuture<V> {
private final long scheduledExecutionTime;
private final String taskId;
private final RequestId taskId;
public RedissonScheduledFuture(RemotePromise<V> promise, long scheduledExecutionTime) {
super(promise);
@ -62,7 +63,7 @@ public class RedissonScheduledFuture<V> extends PromiseDelegator<V> implements R
@Override
public String getTaskId() {
return taskId;
return taskId.toString();
}
}

@ -17,6 +17,7 @@ package org.redisson.executor;
import org.redisson.api.RFuture;
import org.redisson.misc.RedissonPromise;
import org.redisson.remote.RequestId;
/**
*
@ -25,15 +26,21 @@ import org.redisson.misc.RedissonPromise;
*/
public class RemotePromise<T> extends RedissonPromise<T> {
private String requestId;
private final Object param;
private final RequestId requestId;
private RFuture<Boolean> addFuture;
public RemotePromise(String requestId) {
public RemotePromise(RequestId requestId, Object param) {
super();
this.requestId = requestId;
this.param = param;
}
public String getRequestId() {
public <P> P getParam() {
return (P) param;
}
public RequestId getRequestId() {
return requestId;
}

@ -20,7 +20,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.redisson.RedissonExecutorService;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.api.RemoteInvocationOptions;
@ -30,6 +29,7 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.remote.RRemoteServiceResponse;
import org.redisson.remote.RemoteServiceRequest;
import org.redisson.remote.RequestId;
import org.redisson.remote.ResponseEntry;
/**
@ -39,7 +39,7 @@ import org.redisson.remote.ResponseEntry;
*/
public class ScheduledTasksService extends TasksService {
private String requestId;
private RequestId requestId;
private String schedulerQueueName;
private String schedulerChannelName;
@ -47,7 +47,7 @@ public class ScheduledTasksService extends TasksService {
super(codec, redisson, name, commandExecutor, redissonId, responses);
}
public void setRequestId(String requestId) {
public void setRequestId(RequestId requestId) {
this.requestId = requestId;
}
@ -60,7 +60,7 @@ public class ScheduledTasksService extends TasksService {
}
@Override
protected RFuture<Boolean> addAsync(RBlockingQueue<RemoteServiceRequest> requestQueue, RemoteServiceRequest request) {
protected RFuture<Boolean> addAsync(String requestQueueName, RemoteServiceRequest request) {
int requestIndex = 0;
if ("scheduleCallable".equals(request.getMethodName())
|| "scheduleRunnable".equals(request.getMethodName())) {
@ -114,32 +114,38 @@ public class ScheduledTasksService extends TasksService {
startTime, request.getId(), encode(request));
}
@Override
protected Object getParam(RemoteServiceRequest request) {
Long startTime = 0L;
if (request != null && request.getArgs() != null && request.getArgs().length > 3) {
startTime = (Long)request.getArgs()[3];
}
return startTime;
}
@Override
protected void awaitResultAsync(final RemoteInvocationOptions optionsCopy, final RemotePromise<Object> result,
final RemoteServiceRequest request, final RFuture<RRemoteServiceResponse> responseFuture) {
final RFuture<RRemoteServiceResponse> responseFuture) {
if (!optionsCopy.isResultExpected()) {
return;
}
Long startTime = 0L;
if (request != null && request.getArgs() != null && request.getArgs().length > 3) {
startTime = (Long)request.getArgs()[3];
}
long startTime = result.getParam();
long delay = startTime - System.currentTimeMillis();
if (delay > 0) {
commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() {
@Override
public void run() {
ScheduledTasksService.super.awaitResultAsync(optionsCopy, result, request, responseFuture);
ScheduledTasksService.super.awaitResultAsync(optionsCopy, result, responseFuture);
}
}, (long)(delay - delay*0.10), TimeUnit.MILLISECONDS);
} else {
super.awaitResultAsync(optionsCopy, result, request, responseFuture);
super.awaitResultAsync(optionsCopy, result, responseFuture);
}
}
@Override
protected RFuture<Boolean> removeAsync(RBlockingQueue<RemoteServiceRequest> requestQueue, RemoteServiceRequest request) {
protected RFuture<Boolean> removeAsync(String requestQueueName, RequestId taskId) {
return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// remove from scheduler queue
"if redis.call('zrem', KEYS[2], ARGV[1]) > 0 then "
@ -155,7 +161,7 @@ public class ScheduledTasksService extends TasksService {
+ "end;"
+ "local task = redis.call('hget', KEYS[6], ARGV[1]); "
// remove from executor queue
+ "if task ~= false and redis.call('lrem', KEYS[1], 1, task) > 0 then "
+ "if task ~= false and redis.call('lrem', KEYS[1], 1, ARGV[1]) > 0 then "
+ "redis.call('hdel', KEYS[6], ARGV[1]); "
+ "if redis.call('decr', KEYS[3]) == 0 then "
+ "redis.call('del', KEYS[3]);"
@ -169,12 +175,12 @@ public class ScheduledTasksService extends TasksService {
// delete scheduled task
+ "redis.call('hdel', KEYS[6], ARGV[1]); "
+ "return 0;",
Arrays.<Object>asList(requestQueue.getName(), schedulerQueueName, tasksCounterName, statusName, terminationTopicName, tasksName),
request.getId(), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE);
Arrays.<Object>asList(requestQueueName, schedulerQueueName, tasksCounterName, statusName, terminationTopicName, tasksName),
taskId.toString(), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE);
}
@Override
protected String generateRequestId() {
protected RequestId generateRequestId() {
if (requestId == null) {
return super.generateRequestId();
}

@ -31,6 +31,7 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.misc.Injector;
import org.redisson.remote.RequestId;
import org.redisson.remote.ResponseEntry;
import io.netty.buffer.ByteBuf;
@ -137,7 +138,7 @@ public class TasksRunnerService implements RemoteExecutorService {
scheduledRemoteService.setSchedulerQueueName(schedulerQueueName);
scheduledRemoteService.setSchedulerChannelName(schedulerChannelName);
scheduledRemoteService.setTasksName(tasksName);
scheduledRemoteService.setRequestId(requestId);
scheduledRemoteService.setRequestId(new RequestId(requestId));
RemoteExecutorServiceAsync asyncScheduledServiceAtFixed = scheduledRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult());
return asyncScheduledServiceAtFixed;
}

@ -21,7 +21,6 @@ import java.util.concurrent.TimeUnit;
import org.redisson.BaseRemoteService;
import org.redisson.RedissonExecutorService;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RFuture;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
@ -34,6 +33,7 @@ import org.redisson.misc.RedissonPromise;
import org.redisson.remote.RemoteServiceCancelRequest;
import org.redisson.remote.RemoteServiceCancelResponse;
import org.redisson.remote.RemoteServiceRequest;
import org.redisson.remote.RequestId;
import org.redisson.remote.ResponseEntry;
import io.netty.util.concurrent.Future;
@ -72,10 +72,10 @@ public class TasksService extends BaseRemoteService {
}
@Override
protected final RFuture<Boolean> addAsync(RBlockingQueue<RemoteServiceRequest> requestQueue,
protected final RFuture<Boolean> addAsync(String requestQueueName,
RemoteServiceRequest request, RemotePromise<Object> result) {
final RPromise<Boolean> promise = new RedissonPromise<Boolean>();
RFuture<Boolean> future = addAsync(requestQueue, request);
RFuture<Boolean> future = addAsync(requestQueueName, request);
result.setAddFuture(future);
future.addListener(new FutureListener<Boolean>() {
@ -102,26 +102,26 @@ public class TasksService extends BaseRemoteService {
return commandExecutor;
}
protected RFuture<Boolean> addAsync(RBlockingQueue<RemoteServiceRequest> requestQueue, RemoteServiceRequest request) {
protected RFuture<Boolean> addAsync(String requestQueueName, RemoteServiceRequest request) {
request.getArgs()[3] = request.getId();
return getAddCommandExecutor().evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('exists', KEYS[2]) == 0 then "
+ "redis.call('rpush', KEYS[3], ARGV[2]); "
+ "redis.call('hset', KEYS[4], ARGV[1], ARGV[2]);"
+ "redis.call('rpush', KEYS[3], ARGV[1]); "
+ "redis.call('incr', KEYS[1]);"
+ "return 1;"
+ "end;"
+ "return 0;",
Arrays.<Object>asList(tasksCounterName, statusName, requestQueue.getName(), tasksName),
Arrays.<Object>asList(tasksCounterName, statusName, requestQueueName, tasksName),
request.getId(), encode(request));
}
@Override
protected RFuture<Boolean> removeAsync(RBlockingQueue<RemoteServiceRequest> requestQueue, RemoteServiceRequest request) {
protected RFuture<Boolean> removeAsync(String requestQueueName, RequestId taskId) {
return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local task = redis.call('hget', KEYS[5], ARGV[1]); " +
"if task ~= false and redis.call('lrem', KEYS[1], 1, task) > 0 then "
"if task ~= false and redis.call('lrem', KEYS[1], 1, ARGV[1]) > 0 then "
+ "redis.call('hdel', KEYS[5], ARGV[1]); "
+ "if redis.call('decr', KEYS[2]) == 0 then "
+ "redis.call('del', KEYS[2]);"
@ -132,13 +132,12 @@ public class TasksService extends BaseRemoteService {
+ "end;"
+ "return 1;"
+ "end;"
+ "redis.call('hdel', KEYS[5], ARGV[1]); "
+ "return 0;",
Arrays.<Object>asList(requestQueue.getName(), tasksCounterName, statusName, terminationTopicName, tasksName),
request.getId(), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE);
Arrays.<Object>asList(requestQueueName, tasksCounterName, statusName, terminationTopicName, tasksName),
taskId.toString(), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE);
}
public RFuture<Boolean> cancelExecutionAsync(final String requestId) {
public RFuture<Boolean> cancelExecutionAsync(final RequestId requestId) {
final Class<?> syncInterface = RemoteExecutorService.class;
if (!redisson.getMap(tasksName, LongCodec.INSTANCE).containsKey(requestId)) {
@ -148,10 +147,7 @@ public class TasksService extends BaseRemoteService {
final RPromise<Boolean> result = new RedissonPromise<Boolean>();
String requestQueueName = getRequestQueueName(syncInterface);
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName, codec);
RemoteServiceRequest request = new RemoteServiceRequest(requestId);
RFuture<Boolean> removeFuture = removeAsync(requestQueue, request);
RFuture<Boolean> removeFuture = removeAsync(requestQueueName, requestId);
removeFuture.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
@ -164,7 +160,7 @@ public class TasksService extends BaseRemoteService {
result.trySuccess(true);
} else {
RMap<String, RemoteServiceCancelRequest> canceledRequests = redisson.getMap(cancelRequestMapName, codec);
canceledRequests.putAsync(requestId, new RemoteServiceCancelRequest(true, true));
canceledRequests.putAsync(requestId.toString(), new RemoteServiceCancelRequest(true, true));
canceledRequests.expireAsync(60, TimeUnit.SECONDS);
final RPromise<RemoteServiceCancelResponse> response = new RedissonPromise<RemoteServiceCancelResponse>();

@ -0,0 +1,84 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.remote;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
/**
*
* @author Nikita Koksharov
*
*/
public class RequestId {
private final long id0;
private final long id1;
public RequestId(String id) {
this(ByteBufUtil.decodeHexDump(id));
}
public RequestId(byte[] buf) {
ByteBuf b = Unpooled.wrappedBuffer(buf);
try {
id0 = b.readLong();
id1 = b.readLong();
} finally {
b.release();
}
}
@Override
public String toString() {
ByteBuf id = Unpooled.buffer(16);
try {
id.writeLong(id0);
id.writeLong(id1);
return ByteBufUtil.hexDump(id);
} finally {
id.release();
}
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (int) (id0 ^ (id0 >>> 32));
result = prime * result + (int) (id1 ^ (id1 >>> 32));
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
RequestId other = (RequestId) obj;
if (id0 != other.id0)
return false;
if (id1 != other.id1)
return false;
return true;
}
}

@ -23,10 +23,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.misc.RPromise;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
/**
*
* @author Nikita Koksharov
@ -34,49 +30,6 @@ import io.netty.buffer.Unpooled;
*/
public class ResponseEntry {
public static class Key {
private final long id0;
private final long id1;
public Key(String id) {
byte[] buf = ByteBufUtil.decodeHexDump(id);
ByteBuf b = Unpooled.wrappedBuffer(buf);
try {
id0 = b.readLong();
id1 = b.readLong();
} finally {
b.release();
}
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (int) (id0 ^ (id0 >>> 32));
result = prime * result + (int) (id1 ^ (id1 >>> 32));
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Key other = (Key) obj;
if (id0 != other.id0)
return false;
if (id1 != other.id1)
return false;
return true;
}
}
public static class Result {
private final RPromise<? extends RRemoteServiceResponse> promise;
@ -98,10 +51,10 @@ public class ResponseEntry {
}
private final Map<Key, List<Result>> responses = new HashMap<Key, List<Result>>();
private final Map<RequestId, List<Result>> responses = new HashMap<RequestId, List<Result>>();
private final AtomicBoolean started = new AtomicBoolean();
public Map<Key, List<Result>> getResponses() {
public Map<RequestId, List<Result>> getResponses() {
return responses;
}

Loading…
Cancel
Save