diff --git a/redisson/src/main/java/org/redisson/BaseRemoteService.java b/redisson/src/main/java/org/redisson/BaseRemoteService.java index ba1450f48..9344b1b0f 100644 --- a/redisson/src/main/java/org/redisson/BaseRemoteService.java +++ b/redisson/src/main/java/org/redisson/BaseRemoteService.java @@ -51,8 +51,8 @@ import org.redisson.remote.RemoteServiceCancelResponse; import org.redisson.remote.RemoteServiceRequest; import org.redisson.remote.RemoteServiceResponse; import org.redisson.remote.RemoteServiceTimeoutException; +import org.redisson.remote.RequestId; import org.redisson.remote.ResponseEntry; -import org.redisson.remote.ResponseEntry.Key; import org.redisson.remote.ResponseEntry.Result; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -106,10 +106,15 @@ public abstract class BaseRemoteService { return "{remote_response}:" + executorId; } + protected String getAckName(RequestId requestId) { + return "{" + name + ":remote" + "}:" + requestId + ":ack"; + } + protected String getAckName(String requestId) { return "{" + name + ":remote" + "}:" + requestId + ":ack"; } + public String getRequestQueueName(Class remoteInterface) { String str = requestQueueNameCache.get(remoteInterface); if (str == null) { @@ -174,7 +179,7 @@ public abstract class BaseRemoteService { InvocationHandler handler = new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { - final String requestId = generateRequestId(); + final RequestId requestId = generateRequestId(); if (method.getName().equals("toString")) { return getClass().getSimpleName() + "-" + remoteInterface.getSimpleName() + "-proxy-" + requestId; @@ -191,10 +196,6 @@ public abstract class BaseRemoteService { final String requestQueueName = getRequestQueueName(syncInterface); - RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName, codec); - final RemoteServiceRequest request = new RemoteServiceRequest(executorId, requestId, method.getName(), getMethodSignatures(method), args, - optionsCopy, System.currentTimeMillis()); - final RFuture ackFuture; if (optionsCopy.isAckExpected()) { ackFuture = poll(optionsCopy.getAckTimeoutInMillis(), requestId, false); @@ -209,7 +210,12 @@ public abstract class BaseRemoteService { responseFuture = null; } - final RemotePromise result = new RemotePromise(requestId) { + RemoteServiceRequest request = new RemoteServiceRequest(executorId, requestId.toString(), method.getName(), getMethodSignatures(method), args, + optionsCopy, System.currentTimeMillis()); + + final Long ackTimeout = request.getOptions().getAckTimeoutInMillis(); + + final RemotePromise result = new RemotePromise(requestId, getParam(request)) { @Override public boolean cancel(boolean mayInterruptIfRunning) { @@ -234,7 +240,7 @@ public abstract class BaseRemoteService { + "end;" + "return 0;", Arrays. asList(ackName, responseQueueName, requestQueueName), - encode(request), request.getOptions().getAckTimeoutInMillis()); + requestId, ackTimeout); boolean ackNotSent = commandExecutor.get(future); if (ackNotSent) { @@ -242,21 +248,19 @@ public abstract class BaseRemoteService { return true; } - return cancel(requestId, request, mayInterruptIfRunning); + return doCancel(mayInterruptIfRunning); } - RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName, codec); - boolean removed = commandExecutor.get(removeAsync(requestQueue, request)); + boolean removed = commandExecutor.get(removeAsync(requestQueueName, requestId)); if (removed) { super.cancel(mayInterruptIfRunning); return true; } - return cancel(requestId, request, mayInterruptIfRunning); + return doCancel(mayInterruptIfRunning); } - private boolean cancel(String requestId, RemoteServiceRequest request, - boolean mayInterruptIfRunning) { + private boolean doCancel(boolean mayInterruptIfRunning) { if (isCancelled()) { return true; } @@ -265,7 +269,7 @@ public abstract class BaseRemoteService { return false; } - cancelExecution(optionsCopy, request, mayInterruptIfRunning, this, responseFuture); + cancelExecution(optionsCopy, mayInterruptIfRunning, this, responseFuture); try { awaitUninterruptibly(60, TimeUnit.SECONDS); @@ -276,7 +280,7 @@ public abstract class BaseRemoteService { } }; - RFuture addFuture = addAsync(requestQueue, request, result); + RFuture addFuture = addAsync(requestQueueName, request, result); addFuture.addListener(new FutureListener() { @Override @@ -320,7 +324,7 @@ public abstract class BaseRemoteService { if (ack == null) { final String ackName = getAckName(requestId); RFuture ackFutureAttempt = - tryPollAckAgainAsync(optionsCopy, ackName, request.getId()); + tryPollAckAgainAsync(optionsCopy, ackName, requestId); ackFutureAttempt.addListener(new FutureListener() { @Override @@ -335,22 +339,22 @@ public abstract class BaseRemoteService { Exception ex = new RemoteServiceAckTimeoutException( "No ACK response after " + optionsCopy.getAckTimeoutInMillis() - + "ms for request: " + request); + + "ms for request: " + requestId); result.tryFailure(ex); return; } - awaitResultAsync(optionsCopy, result, request, ackName, responseFuture); + awaitResultAsync(optionsCopy, result, ackName, responseFuture); } }); } else { - awaitResultAsync(optionsCopy, result, request, responseFuture); + awaitResultAsync(optionsCopy, result, responseFuture); } } }); } else { - awaitResultAsync(optionsCopy, result, request, responseFuture); + awaitResultAsync(optionsCopy, result, responseFuture); } } }); @@ -362,8 +366,12 @@ public abstract class BaseRemoteService { return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[] { remoteInterface }, handler); } + protected Object getParam(RemoteServiceRequest request) { + return null; + } + private void awaitResultAsync(final RemoteInvocationOptions optionsCopy, final RemotePromise result, - final RemoteServiceRequest request, final String ackName, final RFuture responseFuture) { + final String ackName, final RFuture responseFuture) { RFuture deleteFuture = redisson.getBucket(ackName).deleteAsync(); deleteFuture.addListener(new FutureListener() { @Override @@ -373,19 +381,18 @@ public abstract class BaseRemoteService { return; } - awaitResultAsync(optionsCopy, result, request, responseFuture); + awaitResultAsync(optionsCopy, result, responseFuture); } }); } protected void awaitResultAsync(final RemoteInvocationOptions optionsCopy, final RemotePromise result, - final RemoteServiceRequest request, RFuture responseFuture) { + RFuture responseFuture) { // poll for the response only if expected if (!optionsCopy.isResultExpected()) { return; } - final String requestId = request.getId(); responseFuture.addListener(new FutureListener() { @Override @@ -397,7 +404,7 @@ public abstract class BaseRemoteService { if (future.getNow() == null) { RemoteServiceTimeoutException e = new RemoteServiceTimeoutException("No response after " - + optionsCopy.getExecutionTimeoutInMillis() + "ms for request: " + requestId); + + optionsCopy.getExecutionTimeoutInMillis() + "ms for request: " + result.getRequestId()); result.tryFailure(e); return; } @@ -419,8 +426,7 @@ public abstract class BaseRemoteService { } private RPromise poll(final long timeout, - String requestId, boolean insertFirst) { - final Key key = new Key(requestId); + final RequestId requestId, boolean insertFirst) { final RPromise responseFuture = new RedissonPromise(); ResponseEntry entry; @@ -440,7 +446,7 @@ public abstract class BaseRemoteService { if (future.isCancelled()) { synchronized (responses) { ResponseEntry entry = responses.get(responseQueueName); - List list = entry.getResponses().get(key); + List list = entry.getResponses().get(requestId); for (Iterator iterator = list.iterator(); iterator.hasNext();) { Result result = iterator.next(); if (result.getPromise() == responseFuture) { @@ -449,7 +455,7 @@ public abstract class BaseRemoteService { } } if (list.isEmpty()) { - entry.getResponses().remove(key); + entry.getResponses().remove(requestId); } if (entry.getResponses().isEmpty()) { @@ -472,9 +478,12 @@ public abstract class BaseRemoteService { RemoteServiceTimeoutException ex = new RemoteServiceTimeoutException("No response after " + timeout + "ms"); if (responseFuture.tryFailure(ex)) { - List list = entry.getResponses().get(key); + List list = entry.getResponses().get(requestId); list.remove(0); if (list.isEmpty()) { + entry.getResponses().remove(requestId); + } + if (entry.getResponses().isEmpty()) { responses.remove(responseQueueName, entry); } } @@ -482,11 +491,11 @@ public abstract class BaseRemoteService { } }, timeout, TimeUnit.MILLISECONDS); - final Map> entryResponses = entry.getResponses(); - List list = entryResponses.get(key); + final Map> entryResponses = entry.getResponses(); + List list = entryResponses.get(requestId); if (list == null) { - list = new ArrayList(); - entryResponses.put(key, list); + list = new ArrayList(3); + entryResponses.put(requestId, list); } Result res = new Result(responseFuture, future); @@ -523,12 +532,17 @@ public abstract class BaseRemoteService { synchronized (responses) { ResponseEntry entry = responses.get(responseQueueName); if (entry == null) { - log.error("Can't find entry " + responseQueueName); return; } - Key key = new Key(response.getId()); + RequestId key = new RequestId(response.getId()); List list = entry.getResponses().get(key); + if (list == null) { + RBlockingQueue responseQueue = redisson.getBlockingQueue(responseQueueName, codec); + responseQueue.takeAsync().addListener(this); + return; + } + Result res = list.remove(0); if (list.isEmpty()) { entry.getResponses().remove(key); @@ -572,14 +586,16 @@ public abstract class BaseRemoteService { && !(method.getReturnType().equals(Void.class) || method.getReturnType().equals(Void.TYPE))) throw new IllegalArgumentException("The noResult option only supports void return value"); - String requestId = generateRequestId(); + RequestId requestId = generateRequestId(); String requestQueueName = getRequestQueueName(remoteInterface); - RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName, codec); - RemoteServiceRequest request = new RemoteServiceRequest(executorId, requestId, method.getName(), getMethodSignatures(method), args, optionsCopy, + RemoteServiceRequest request = new RemoteServiceRequest(executorId, requestId.toString(), method.getName(), getMethodSignatures(method), args, optionsCopy, System.currentTimeMillis()); - requestQueue.add(request); - + + RemotePromise addPromise = new RemotePromise(requestId, null); + addAsync(requestQueueName, request, addPromise); + addPromise.getAddFuture().sync(); + RBlockingQueue responseQueue = null; if (optionsCopy.isAckExpected() || optionsCopy.isResultExpected()) { responseQueue = redisson.getBlockingQueue(responseQueueName, codec); @@ -641,7 +657,7 @@ public abstract class BaseRemoteService { } private RFuture tryPollAckAgainAsync(final RemoteInvocationOptions optionsCopy, - String ackName, final String requestId) { + String ackName, final RequestId requestId) { final RPromise promise = new RedissonPromise(); RFuture ackClientsFuture = commandExecutor.evalWriteAsync(ackName, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if redis.call('setnx', KEYS[1], 1) == 1 then " @@ -680,7 +696,7 @@ public abstract class BaseRemoteService { return promise; } - protected void scheduleCheck(final String mapName, final String requestId, final RPromise cancelRequest) { + protected void scheduleCheck(final String mapName, final RequestId requestId, final RPromise cancelRequest) { commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { @@ -689,7 +705,7 @@ public abstract class BaseRemoteService { } RMap canceledRequests = redisson.getMap(mapName, codec); - RFuture future = canceledRequests.getAsync(requestId); + RFuture future = canceledRequests.getAsync(requestId.toString()); future.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -713,35 +729,47 @@ public abstract class BaseRemoteService { }, 3000, TimeUnit.MILLISECONDS); } - protected String generateRequestId() { + protected RequestId generateRequestId() { byte[] id = new byte[16]; // TODO JDK UPGRADE replace to native ThreadLocalRandom ThreadLocalRandom.current().nextBytes(id); - return ByteBufUtil.hexDump(id); + return new RequestId(id); } - protected RFuture addAsync(RBlockingQueue requestQueue, RemoteServiceRequest request, + protected RFuture addAsync(String requestQueueName, RemoteServiceRequest request, RemotePromise result) { - RFuture future = requestQueue.addAsync(request); + RFuture future = commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "redis.call('hset', KEYS[2], ARGV[1], ARGV[2]);" + + "redis.call('rpush', KEYS[1], ARGV[1]); " + + "return 1;", + Arrays.asList(requestQueueName, requestQueueName + ":tasks"), + request.getId(), encode(request)); + result.setAddFuture(future); return future; } - protected RFuture removeAsync(RBlockingQueue requestQueue, RemoteServiceRequest request) { - return requestQueue.removeAsync(request); + protected RFuture removeAsync(String requestQueueName, RequestId taskId) { + return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "redis.call('lrem', KEYS[1], 1, ARGV[1]); " + + "redis.call('hset', KEYS[2], ARGV[1]);" + + "return 1;", + Arrays.asList(requestQueueName, requestQueueName + ":tasks"), + taskId.toString()); } private void cancelExecution(RemoteInvocationOptions optionsCopy, - RemoteServiceRequest request, boolean mayInterruptIfRunning, RemotePromise remotePromise, RFuture responseFuture) { + boolean mayInterruptIfRunning, RemotePromise remotePromise, RFuture responseFuture) { RMap canceledRequests = redisson.getMap(cancelRequestMapName, codec); - canceledRequests.putAsync(request.getId(), new RemoteServiceCancelRequest(mayInterruptIfRunning, false)); + canceledRequests.putAsync(remotePromise.getRequestId().toString(), new RemoteServiceCancelRequest(mayInterruptIfRunning, false)); canceledRequests.expireAsync(60, TimeUnit.SECONDS); // subscribe for async result if it's not expected before if (!optionsCopy.isResultExpected()) { RemoteInvocationOptions options = new RemoteInvocationOptions(optionsCopy); options.expectResultWithin(60, TimeUnit.SECONDS); - awaitResultAsync(options, remotePromise, request, responseFuture); + responseFuture = poll(options.getExecutionTimeoutInMillis(), remotePromise.getRequestId(), false); + awaitResultAsync(options, remotePromise, responseFuture); } } diff --git a/redisson/src/main/java/org/redisson/RedissonExecutorService.java b/redisson/src/main/java/org/redisson/RedissonExecutorService.java index 50920d8b0..aebfe6f57 100644 --- a/redisson/src/main/java/org/redisson/RedissonExecutorService.java +++ b/redisson/src/main/java/org/redisson/RedissonExecutorService.java @@ -69,6 +69,7 @@ import org.redisson.misc.Injector; import org.redisson.misc.PromiseDelegator; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; +import org.redisson.remote.RequestId; import org.redisson.remote.ResponseEntry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -221,8 +222,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { "local expiredTaskIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); " + "if #expiredTaskIds > 0 then " + "redis.call('zrem', KEYS[2], unpack(expiredTaskIds));" - + "local expiredTasks = redis.call('hmget', KEYS[3], unpack(expiredTaskIds));" - + "redis.call('rpush', KEYS[1], unpack(expiredTasks));" + + "redis.call('rpush', KEYS[1], unpack(expiredTaskIds));" + "end; " // get startTime from scheduler queue head task + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); " @@ -230,7 +230,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { + "return v[2]; " + "end " + "return nil;", - Arrays.asList(requestQueueName, schedulerQueueName, tasksName), + Arrays.asList(requestQueueName, schedulerQueueName), System.currentTimeMillis(), 100); } }; @@ -769,7 +769,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { @Override public boolean cancelTask(String taskId) { - RFuture scheduledFuture = scheduledRemoteService.cancelExecutionAsync(taskId); + RFuture scheduledFuture = scheduledRemoteService.cancelExecutionAsync(new RequestId(taskId)); return commandExecutor.get(scheduledFuture); } diff --git a/redisson/src/main/java/org/redisson/RedissonRemoteService.java b/redisson/src/main/java/org/redisson/RedissonRemoteService.java index 1145e1eed..deaf13492 100644 --- a/redisson/src/main/java/org/redisson/RedissonRemoteService.java +++ b/redisson/src/main/java/org/redisson/RedissonRemoteService.java @@ -33,7 +33,9 @@ import org.redisson.api.RRemoteService; import org.redisson.api.RedissonClient; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; +import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; +import org.redisson.codec.CompositeCodec; import org.redisson.command.CommandExecutor; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; @@ -45,6 +47,7 @@ import org.redisson.remote.RemoteServiceKey; import org.redisson.remote.RemoteServiceMethod; import org.redisson.remote.RemoteServiceRequest; import org.redisson.remote.RemoteServiceResponse; +import org.redisson.remote.RequestId; import org.redisson.remote.ResponseEntry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,7 +66,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS private static final Logger log = LoggerFactory.getLogger(RedissonRemoteService.class); private final Map beans = PlatformDependent.newConcurrentHashMap(); - private final Map, Set>> futures = PlatformDependent.newConcurrentHashMap(); + private final Map, Set>> futures = PlatformDependent.newConcurrentHashMap(); public RedissonRemoteService(Codec codec, RedissonClient redisson, String name, CommandExecutor commandExecutor, String executorId, ConcurrentMap responses) { super(codec, redisson, name, commandExecutor, executorId, responses); @@ -81,19 +84,19 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS beans.remove(key); } - Set> removedFutures = futures.remove(remoteInterface); + Set> removedFutures = futures.remove(remoteInterface); if (removedFutures == null) { return; } - for (RFuture future : removedFutures) { + for (RFuture future : removedFutures) { future.cancel(false); } } @Override public int getFreeWorkers(Class remoteInterface) { - Set> futuresSet = futures.get(remoteInterface); + Set> futuresSet = futures.get(remoteInterface); return futuresSet.size(); } @@ -115,28 +118,28 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS } } - Set> values = Collections.newSetFromMap(PlatformDependent., Boolean>newConcurrentHashMap()); + Set> values = Collections.newSetFromMap(PlatformDependent., Boolean>newConcurrentHashMap()); futures.put(remoteInterface, values); String requestQueueName = getRequestQueueName(remoteInterface); - RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName, codec); + RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName, StringCodec.INSTANCE); for (int i = 0; i < workers; i++) { subscribe(remoteInterface, requestQueue, executor); } } - private void subscribe(final Class remoteInterface, final RBlockingQueue requestQueue, + private void subscribe(final Class remoteInterface, final RBlockingQueue requestQueue, final ExecutorService executor) { - Set> futuresSet = futures.get(remoteInterface); + Set> futuresSet = futures.get(remoteInterface); if (futuresSet == null) { return; } - final RFuture take = requestQueue.takeAsync(); + final RFuture take = requestQueue.takeAsync(); futuresSet.add(take); - take.addListener(new FutureListener() { + take.addListener(new FutureListener() { @Override - public void operationComplete(Future future) throws Exception { - Set> futuresSet = futures.get(remoteInterface); + public void operationComplete(Future future) throws Exception { + Set> futuresSet = futures.get(remoteInterface); if (futuresSet == null) { return; } @@ -156,63 +159,83 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS // https://github.com/mrniko/redisson/issues/493 // subscribe(remoteInterface, requestQueue); - final RemoteServiceRequest request = future.getNow(); - // check the ack only if expected - if (request.getOptions().isAckExpected() && System.currentTimeMillis() - request.getDate() > request - .getOptions().getAckTimeoutInMillis()) { - log.debug("request: {} has been skipped due to ackTimeout"); - // re-subscribe after a skipped ackTimeout - subscribe(remoteInterface, requestQueue, executor); - return; - } + final String requestId = future.getNow(); + RMap tasks = redisson.getMap(requestQueue.getName() + ":tasks", new CompositeCodec(StringCodec.INSTANCE, codec, codec)); + RFuture taskFuture = tasks.getAsync(requestId); + taskFuture.addListener(new FutureListener() { + + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + if (future.cause() instanceof RedissonShutdownException) { + return; + } + log.error("Can't process the remote service request with id " + requestId, future.cause()); + // re-subscribe after a failed takeAsync + subscribe(remoteInterface, requestQueue, executor); + return; + } + + final RemoteServiceRequest request = future.getNow(); + // check the ack only if expected + if (request.getOptions().isAckExpected() && System.currentTimeMillis() - request.getDate() > request + .getOptions().getAckTimeoutInMillis()) { + log.debug("request: {} has been skipped due to ackTimeout"); + // re-subscribe after a skipped ackTimeout + subscribe(remoteInterface, requestQueue, executor); + return; + } - final String responseName = getResponseQueueName(request.getExecutorId()); + final String responseName = getResponseQueueName(request.getExecutorId()); - // send the ack only if expected - if (request.getOptions().isAckExpected()) { - String ackName = getAckName(request.getId()); - RFuture ackClientsFuture = commandExecutor.evalWriteAsync(responseName, - LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, - "if redis.call('setnx', KEYS[1], 1) == 1 then " - + "redis.call('pexpire', KEYS[1], ARGV[2]);" - + "redis.call('rpush', KEYS[2], ARGV[1]);" -// + "redis.call('pexpire', KEYS[2], ARGV[2]);" - + "return 1;" - + "end;" - + "return 0;", - Arrays.asList(ackName, responseName), - encode(new RemoteServiceAck(request.getId())), request.getOptions().getAckTimeoutInMillis()); + // send the ack only if expected + if (request.getOptions().isAckExpected()) { + String ackName = getAckName(request.getId()); + RFuture ackClientsFuture = commandExecutor.evalWriteAsync(responseName, + LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "if redis.call('setnx', KEYS[1], 1) == 1 then " + + "redis.call('pexpire', KEYS[1], ARGV[2]);" + + "redis.call('rpush', KEYS[2], ARGV[1]);" +// + "redis.call('pexpire', KEYS[2], ARGV[2]);" + + "return 1;" + + "end;" + + "return 0;", + Arrays.asList(ackName, responseName), + encode(new RemoteServiceAck(request.getId())), request.getOptions().getAckTimeoutInMillis()); - ackClientsFuture.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - if (future.cause() instanceof RedissonShutdownException) { - return; - } - log.error("Can't send ack for request: " + request, future.cause()); - // re-subscribe after a failed send (ack) - subscribe(remoteInterface, requestQueue, executor); - return; - } + ackClientsFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + if (future.cause() instanceof RedissonShutdownException) { + return; + } + log.error("Can't send ack for request: " + request, future.cause()); + // re-subscribe after a failed send (ack) + subscribe(remoteInterface, requestQueue, executor); + return; + } - if (!future.getNow()) { - subscribe(remoteInterface, requestQueue, executor); - return; - } + if (!future.getNow()) { + subscribe(remoteInterface, requestQueue, executor); + return; + } - executeMethod(remoteInterface, requestQueue, executor, request); - } - }); - } else { - executeMethod(remoteInterface, requestQueue, executor, request); - } + executeMethod(remoteInterface, requestQueue, executor, request); + } + }); + } else { + executeMethod(remoteInterface, requestQueue, executor, request); + } + } + }); + } }); } - private void executeMethod(final Class remoteInterface, final RBlockingQueue requestQueue, + private void executeMethod(final Class remoteInterface, final RBlockingQueue requestQueue, final ExecutorService executor, final RemoteServiceRequest request) { final RemoteServiceMethod method = beans.get(new RemoteServiceKey(remoteInterface, request.getMethodName(), request.getSignatures())); final String responseName = getResponseQueueName(request.getExecutorId()); @@ -221,7 +244,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS final AtomicReference responseHolder = new AtomicReference(); final RPromise cancelRequestFuture = new RedissonPromise(); - scheduleCheck(cancelRequestMapName, request.getId(), cancelRequestFuture); + scheduleCheck(cancelRequestMapName, new RequestId(request.getId()), cancelRequestFuture); final java.util.concurrent.Future submitFuture = executor.submit(new Runnable() { @Override @@ -257,7 +280,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS } private void invokeMethod(final Class remoteInterface, - final RBlockingQueue requestQueue, final RemoteServiceRequest request, + final RBlockingQueue requestQueue, final RemoteServiceRequest request, RemoteServiceMethod method, String responseName, final ExecutorService executor, RFuture cancelRequestFuture, final AtomicReference responseHolder) { try { diff --git a/redisson/src/main/java/org/redisson/executor/RedissonExecutorFuture.java b/redisson/src/main/java/org/redisson/executor/RedissonExecutorFuture.java index 2489bc906..5a27b0d17 100644 --- a/redisson/src/main/java/org/redisson/executor/RedissonExecutorFuture.java +++ b/redisson/src/main/java/org/redisson/executor/RedissonExecutorFuture.java @@ -18,6 +18,7 @@ package org.redisson.executor; import org.redisson.api.RExecutorFuture; import org.redisson.misc.PromiseDelegator; import org.redisson.misc.RPromise; +import org.redisson.remote.RequestId; /** * @@ -27,16 +28,16 @@ import org.redisson.misc.RPromise; */ public class RedissonExecutorFuture extends PromiseDelegator implements RExecutorFuture { - private final String taskId; + private final RequestId taskId; - public RedissonExecutorFuture(RPromise promise, String taskId) { + public RedissonExecutorFuture(RPromise promise, RequestId taskId) { super(promise); this.taskId = taskId; } @Override public String getTaskId() { - return taskId; + return taskId.toString(); } } diff --git a/redisson/src/main/java/org/redisson/executor/RedissonScheduledFuture.java b/redisson/src/main/java/org/redisson/executor/RedissonScheduledFuture.java index 338188918..428daa7fa 100644 --- a/redisson/src/main/java/org/redisson/executor/RedissonScheduledFuture.java +++ b/redisson/src/main/java/org/redisson/executor/RedissonScheduledFuture.java @@ -20,6 +20,7 @@ import java.util.concurrent.TimeUnit; import org.redisson.api.RScheduledFuture; import org.redisson.misc.PromiseDelegator; +import org.redisson.remote.RequestId; /** * @@ -30,7 +31,7 @@ import org.redisson.misc.PromiseDelegator; public class RedissonScheduledFuture extends PromiseDelegator implements RScheduledFuture { private final long scheduledExecutionTime; - private final String taskId; + private final RequestId taskId; public RedissonScheduledFuture(RemotePromise promise, long scheduledExecutionTime) { super(promise); @@ -62,7 +63,7 @@ public class RedissonScheduledFuture extends PromiseDelegator implements R @Override public String getTaskId() { - return taskId; + return taskId.toString(); } } diff --git a/redisson/src/main/java/org/redisson/executor/RemotePromise.java b/redisson/src/main/java/org/redisson/executor/RemotePromise.java index b32784abf..1925fdd42 100644 --- a/redisson/src/main/java/org/redisson/executor/RemotePromise.java +++ b/redisson/src/main/java/org/redisson/executor/RemotePromise.java @@ -17,6 +17,7 @@ package org.redisson.executor; import org.redisson.api.RFuture; import org.redisson.misc.RedissonPromise; +import org.redisson.remote.RequestId; /** * @@ -25,15 +26,21 @@ import org.redisson.misc.RedissonPromise; */ public class RemotePromise extends RedissonPromise { - private String requestId; + private final Object param; + private final RequestId requestId; private RFuture addFuture; - public RemotePromise(String requestId) { + public RemotePromise(RequestId requestId, Object param) { super(); this.requestId = requestId; + this.param = param; } - public String getRequestId() { + public

P getParam() { + return (P) param; + } + + public RequestId getRequestId() { return requestId; } diff --git a/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java b/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java index 82a8880f3..9107a706b 100644 --- a/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java +++ b/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java @@ -20,7 +20,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import org.redisson.RedissonExecutorService; -import org.redisson.api.RBlockingQueue; import org.redisson.api.RFuture; import org.redisson.api.RedissonClient; import org.redisson.api.RemoteInvocationOptions; @@ -30,6 +29,7 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandExecutor; import org.redisson.remote.RRemoteServiceResponse; import org.redisson.remote.RemoteServiceRequest; +import org.redisson.remote.RequestId; import org.redisson.remote.ResponseEntry; /** @@ -39,7 +39,7 @@ import org.redisson.remote.ResponseEntry; */ public class ScheduledTasksService extends TasksService { - private String requestId; + private RequestId requestId; private String schedulerQueueName; private String schedulerChannelName; @@ -47,7 +47,7 @@ public class ScheduledTasksService extends TasksService { super(codec, redisson, name, commandExecutor, redissonId, responses); } - public void setRequestId(String requestId) { + public void setRequestId(RequestId requestId) { this.requestId = requestId; } @@ -60,7 +60,7 @@ public class ScheduledTasksService extends TasksService { } @Override - protected RFuture addAsync(RBlockingQueue requestQueue, RemoteServiceRequest request) { + protected RFuture addAsync(String requestQueueName, RemoteServiceRequest request) { int requestIndex = 0; if ("scheduleCallable".equals(request.getMethodName()) || "scheduleRunnable".equals(request.getMethodName())) { @@ -114,32 +114,38 @@ public class ScheduledTasksService extends TasksService { startTime, request.getId(), encode(request)); } + @Override + protected Object getParam(RemoteServiceRequest request) { + Long startTime = 0L; + if (request != null && request.getArgs() != null && request.getArgs().length > 3) { + startTime = (Long)request.getArgs()[3]; + } + return startTime; + } + @Override protected void awaitResultAsync(final RemoteInvocationOptions optionsCopy, final RemotePromise result, - final RemoteServiceRequest request, final RFuture responseFuture) { + final RFuture responseFuture) { if (!optionsCopy.isResultExpected()) { return; } - Long startTime = 0L; - if (request != null && request.getArgs() != null && request.getArgs().length > 3) { - startTime = (Long)request.getArgs()[3]; - } + long startTime = result.getParam(); long delay = startTime - System.currentTimeMillis(); if (delay > 0) { commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() { @Override public void run() { - ScheduledTasksService.super.awaitResultAsync(optionsCopy, result, request, responseFuture); + ScheduledTasksService.super.awaitResultAsync(optionsCopy, result, responseFuture); } }, (long)(delay - delay*0.10), TimeUnit.MILLISECONDS); } else { - super.awaitResultAsync(optionsCopy, result, request, responseFuture); + super.awaitResultAsync(optionsCopy, result, responseFuture); } } @Override - protected RFuture removeAsync(RBlockingQueue requestQueue, RemoteServiceRequest request) { + protected RFuture removeAsync(String requestQueueName, RequestId taskId) { return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // remove from scheduler queue "if redis.call('zrem', KEYS[2], ARGV[1]) > 0 then " @@ -155,7 +161,7 @@ public class ScheduledTasksService extends TasksService { + "end;" + "local task = redis.call('hget', KEYS[6], ARGV[1]); " // remove from executor queue - + "if task ~= false and redis.call('lrem', KEYS[1], 1, task) > 0 then " + + "if task ~= false and redis.call('lrem', KEYS[1], 1, ARGV[1]) > 0 then " + "redis.call('hdel', KEYS[6], ARGV[1]); " + "if redis.call('decr', KEYS[3]) == 0 then " + "redis.call('del', KEYS[3]);" @@ -169,12 +175,12 @@ public class ScheduledTasksService extends TasksService { // delete scheduled task + "redis.call('hdel', KEYS[6], ARGV[1]); " + "return 0;", - Arrays.asList(requestQueue.getName(), schedulerQueueName, tasksCounterName, statusName, terminationTopicName, tasksName), - request.getId(), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE); + Arrays.asList(requestQueueName, schedulerQueueName, tasksCounterName, statusName, terminationTopicName, tasksName), + taskId.toString(), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE); } @Override - protected String generateRequestId() { + protected RequestId generateRequestId() { if (requestId == null) { return super.generateRequestId(); } diff --git a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java index bb2810fe6..c07efa8a6 100644 --- a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java @@ -31,6 +31,7 @@ import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandExecutor; import org.redisson.misc.Injector; +import org.redisson.remote.RequestId; import org.redisson.remote.ResponseEntry; import io.netty.buffer.ByteBuf; @@ -137,7 +138,7 @@ public class TasksRunnerService implements RemoteExecutorService { scheduledRemoteService.setSchedulerQueueName(schedulerQueueName); scheduledRemoteService.setSchedulerChannelName(schedulerChannelName); scheduledRemoteService.setTasksName(tasksName); - scheduledRemoteService.setRequestId(requestId); + scheduledRemoteService.setRequestId(new RequestId(requestId)); RemoteExecutorServiceAsync asyncScheduledServiceAtFixed = scheduledRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult()); return asyncScheduledServiceAtFixed; } diff --git a/redisson/src/main/java/org/redisson/executor/TasksService.java b/redisson/src/main/java/org/redisson/executor/TasksService.java index 5dd665b1e..5ae2c94e5 100644 --- a/redisson/src/main/java/org/redisson/executor/TasksService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksService.java @@ -21,7 +21,6 @@ import java.util.concurrent.TimeUnit; import org.redisson.BaseRemoteService; import org.redisson.RedissonExecutorService; -import org.redisson.api.RBlockingQueue; import org.redisson.api.RFuture; import org.redisson.api.RMap; import org.redisson.api.RedissonClient; @@ -34,6 +33,7 @@ import org.redisson.misc.RedissonPromise; import org.redisson.remote.RemoteServiceCancelRequest; import org.redisson.remote.RemoteServiceCancelResponse; import org.redisson.remote.RemoteServiceRequest; +import org.redisson.remote.RequestId; import org.redisson.remote.ResponseEntry; import io.netty.util.concurrent.Future; @@ -72,10 +72,10 @@ public class TasksService extends BaseRemoteService { } @Override - protected final RFuture addAsync(RBlockingQueue requestQueue, + protected final RFuture addAsync(String requestQueueName, RemoteServiceRequest request, RemotePromise result) { final RPromise promise = new RedissonPromise(); - RFuture future = addAsync(requestQueue, request); + RFuture future = addAsync(requestQueueName, request); result.setAddFuture(future); future.addListener(new FutureListener() { @@ -102,26 +102,26 @@ public class TasksService extends BaseRemoteService { return commandExecutor; } - protected RFuture addAsync(RBlockingQueue requestQueue, RemoteServiceRequest request) { + protected RFuture addAsync(String requestQueueName, RemoteServiceRequest request) { request.getArgs()[3] = request.getId(); return getAddCommandExecutor().evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if redis.call('exists', KEYS[2]) == 0 then " - + "redis.call('rpush', KEYS[3], ARGV[2]); " + "redis.call('hset', KEYS[4], ARGV[1], ARGV[2]);" + + "redis.call('rpush', KEYS[3], ARGV[1]); " + "redis.call('incr', KEYS[1]);" + "return 1;" + "end;" + "return 0;", - Arrays.asList(tasksCounterName, statusName, requestQueue.getName(), tasksName), + Arrays.asList(tasksCounterName, statusName, requestQueueName, tasksName), request.getId(), encode(request)); } @Override - protected RFuture removeAsync(RBlockingQueue requestQueue, RemoteServiceRequest request) { + protected RFuture removeAsync(String requestQueueName, RequestId taskId) { return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local task = redis.call('hget', KEYS[5], ARGV[1]); " + - "if task ~= false and redis.call('lrem', KEYS[1], 1, task) > 0 then " + "if task ~= false and redis.call('lrem', KEYS[1], 1, ARGV[1]) > 0 then " + "redis.call('hdel', KEYS[5], ARGV[1]); " + "if redis.call('decr', KEYS[2]) == 0 then " + "redis.call('del', KEYS[2]);" @@ -132,13 +132,12 @@ public class TasksService extends BaseRemoteService { + "end;" + "return 1;" + "end;" - + "redis.call('hdel', KEYS[5], ARGV[1]); " + "return 0;", - Arrays.asList(requestQueue.getName(), tasksCounterName, statusName, terminationTopicName, tasksName), - request.getId(), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE); + Arrays.asList(requestQueueName, tasksCounterName, statusName, terminationTopicName, tasksName), + taskId.toString(), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE); } - public RFuture cancelExecutionAsync(final String requestId) { + public RFuture cancelExecutionAsync(final RequestId requestId) { final Class syncInterface = RemoteExecutorService.class; if (!redisson.getMap(tasksName, LongCodec.INSTANCE).containsKey(requestId)) { @@ -148,10 +147,7 @@ public class TasksService extends BaseRemoteService { final RPromise result = new RedissonPromise(); String requestQueueName = getRequestQueueName(syncInterface); - RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName, codec); - - RemoteServiceRequest request = new RemoteServiceRequest(requestId); - RFuture removeFuture = removeAsync(requestQueue, request); + RFuture removeFuture = removeAsync(requestQueueName, requestId); removeFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -164,7 +160,7 @@ public class TasksService extends BaseRemoteService { result.trySuccess(true); } else { RMap canceledRequests = redisson.getMap(cancelRequestMapName, codec); - canceledRequests.putAsync(requestId, new RemoteServiceCancelRequest(true, true)); + canceledRequests.putAsync(requestId.toString(), new RemoteServiceCancelRequest(true, true)); canceledRequests.expireAsync(60, TimeUnit.SECONDS); final RPromise response = new RedissonPromise(); diff --git a/redisson/src/main/java/org/redisson/remote/RequestId.java b/redisson/src/main/java/org/redisson/remote/RequestId.java new file mode 100644 index 000000000..9ef17e73e --- /dev/null +++ b/redisson/src/main/java/org/redisson/remote/RequestId.java @@ -0,0 +1,84 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.remote; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; + +/** + * + * @author Nikita Koksharov + * + */ +public class RequestId { + + private final long id0; + private final long id1; + + public RequestId(String id) { + this(ByteBufUtil.decodeHexDump(id)); + } + + public RequestId(byte[] buf) { + ByteBuf b = Unpooled.wrappedBuffer(buf); + try { + id0 = b.readLong(); + id1 = b.readLong(); + } finally { + b.release(); + } + } + + @Override + public String toString() { + ByteBuf id = Unpooled.buffer(16); + try { + id.writeLong(id0); + id.writeLong(id1); + return ByteBufUtil.hexDump(id); + } finally { + id.release(); + } + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (int) (id0 ^ (id0 >>> 32)); + result = prime * result + (int) (id1 ^ (id1 >>> 32)); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + RequestId other = (RequestId) obj; + if (id0 != other.id0) + return false; + if (id1 != other.id1) + return false; + return true; + } + + +} diff --git a/redisson/src/main/java/org/redisson/remote/ResponseEntry.java b/redisson/src/main/java/org/redisson/remote/ResponseEntry.java index 694588c7c..831f002f1 100644 --- a/redisson/src/main/java/org/redisson/remote/ResponseEntry.java +++ b/redisson/src/main/java/org/redisson/remote/ResponseEntry.java @@ -23,10 +23,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.redisson.misc.RPromise; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; -import io.netty.buffer.Unpooled; - /** * * @author Nikita Koksharov @@ -34,49 +30,6 @@ import io.netty.buffer.Unpooled; */ public class ResponseEntry { - public static class Key { - - private final long id0; - private final long id1; - - public Key(String id) { - byte[] buf = ByteBufUtil.decodeHexDump(id); - ByteBuf b = Unpooled.wrappedBuffer(buf); - try { - id0 = b.readLong(); - id1 = b.readLong(); - } finally { - b.release(); - } - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + (int) (id0 ^ (id0 >>> 32)); - result = prime * result + (int) (id1 ^ (id1 >>> 32)); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - Key other = (Key) obj; - if (id0 != other.id0) - return false; - if (id1 != other.id1) - return false; - return true; - } - - } - public static class Result { private final RPromise promise; @@ -98,10 +51,10 @@ public class ResponseEntry { } - private final Map> responses = new HashMap>(); + private final Map> responses = new HashMap>(); private final AtomicBoolean started = new AtomicBoolean(); - public Map> getResponses() { + public Map> getResponses() { return responses; }