From 8a14faec43334e911b6c6dcf23bf933c48be23a8 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 18 Dec 2017 14:53:00 +0300 Subject: [PATCH] Fixed - Redisson remote services can't ack when running redis in cluster mode. #1191 --- .../java/org/redisson/BaseRemoteService.java | 86 ++++++++----------- .../org/redisson/RedissonRemoteService.java | 67 +++++++++++++-- 2 files changed, 95 insertions(+), 58 deletions(-) diff --git a/redisson/src/main/java/org/redisson/BaseRemoteService.java b/redisson/src/main/java/org/redisson/BaseRemoteService.java index 6f292f716..c17eadaf2 100644 --- a/redisson/src/main/java/org/redisson/BaseRemoteService.java +++ b/redisson/src/main/java/org/redisson/BaseRemoteService.java @@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit; import org.redisson.api.RBlockingQueue; import org.redisson.api.RFuture; +import org.redisson.api.RList; import org.redisson.api.RMap; import org.redisson.api.RedissonClient; import org.redisson.api.RemoteInvocationOptions; @@ -58,7 +59,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; import io.netty.util.Timeout; import io.netty.util.TimerTask; import io.netty.util.concurrent.Future; @@ -196,24 +196,7 @@ public abstract class BaseRemoteService { final String requestQueueName = getRequestQueueName(syncInterface); - final RFuture ackFuture; - if (optionsCopy.isAckExpected()) { - ackFuture = poll(optionsCopy.getAckTimeoutInMillis(), requestId, false); - } else { - ackFuture = null; - } - - final RPromise responseFuture; - if (optionsCopy.isResultExpected()) { - responseFuture = poll(optionsCopy.getExecutionTimeoutInMillis(), requestId, false); - } else { - responseFuture = null; - } - - RemoteServiceRequest request = new RemoteServiceRequest(executorId, requestId.toString(), method.getName(), getMethodSignatures(method), args, - optionsCopy, System.currentTimeMillis()); - - final Long ackTimeout = request.getOptions().getAckTimeoutInMillis(); + final Long ackTimeout = optionsCopy.getAckTimeoutInMillis(); final RemotePromise result = new RemotePromise(requestId) { @@ -233,17 +216,20 @@ public abstract class BaseRemoteService { RFuture future = commandExecutor.evalWriteAsync(responseQueueName, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if redis.call('setnx', KEYS[1], 1) == 1 then " - + "redis.call('pexpire', KEYS[1], ARGV[2]);" - + "redis.call('lrem', KEYS[3], 1, ARGV[1]);" + + "redis.call('pexpire', KEYS[1], ARGV[1]);" +// + "redis.call('lrem', KEYS[3], 1, ARGV[1]);" // + "redis.call('pexpire', KEYS[2], ARGV[2]);" + "return 1;" + "end;" + "return 0;", - Arrays. asList(ackName, responseQueueName, requestQueueName), - requestId, ackTimeout); - + Arrays. asList(ackName), +// Arrays. asList(ackName, responseQueueName, requestQueueName), + ackTimeout); + boolean ackNotSent = commandExecutor.get(future); if (ackNotSent) { + RList list = redisson.getList(requestQueueName, LongCodec.INSTANCE); + list.remove(requestId.toString()); super.cancel(mayInterruptIfRunning); return true; } @@ -269,7 +255,7 @@ public abstract class BaseRemoteService { return false; } - cancelExecution(optionsCopy, mayInterruptIfRunning, this, responseFuture); + cancelExecution(optionsCopy, mayInterruptIfRunning, this); try { awaitUninterruptibly(60, TimeUnit.SECONDS); @@ -280,6 +266,23 @@ public abstract class BaseRemoteService { } }; + RemoteServiceRequest request = new RemoteServiceRequest(executorId, requestId.toString(), method.getName(), getMethodSignatures(method), args, + optionsCopy, System.currentTimeMillis()); + + final RFuture ackFuture; + if (optionsCopy.isAckExpected()) { + ackFuture = poll(optionsCopy.getAckTimeoutInMillis(), requestId, false); + } else { + ackFuture = null; + } + + final RPromise responseFuture; + if (optionsCopy.isResultExpected()) { + responseFuture = poll(optionsCopy.getExecutionTimeoutInMillis(), requestId, false); + } else { + responseFuture = null; + } + RFuture addFuture = addAsync(requestQueueName, request, result); addFuture.addListener(new FutureListener() { @@ -589,12 +592,10 @@ public abstract class BaseRemoteService { RequestId requestId = generateRequestId(); String requestQueueName = getRequestQueueName(remoteInterface); + RemotePromise addPromise = new RemotePromise(requestId); RemoteServiceRequest request = new RemoteServiceRequest(executorId, requestId.toString(), method.getName(), getMethodSignatures(method), args, optionsCopy, System.currentTimeMillis()); - - RemotePromise addPromise = new RemotePromise(requestId); - addAsync(requestQueueName, request, addPromise); - addPromise.getAddFuture().sync(); + addAsync(requestQueueName, request, addPromise).sync(); RBlockingQueue responseQueue = null; if (optionsCopy.isAckExpected() || optionsCopy.isResultExpected()) { @@ -736,30 +737,13 @@ public abstract class BaseRemoteService { return new RequestId(id); } - protected RFuture addAsync(String requestQueueName, RemoteServiceRequest request, - RemotePromise result) { - RFuture future = commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, - "redis.call('hset', KEYS[2], ARGV[1], ARGV[2]);" - + "redis.call('rpush', KEYS[1], ARGV[1]); " - + "return 1;", - Arrays.asList(requestQueueName, requestQueueName + ":tasks"), - request.getId(), encode(request)); - - result.setAddFuture(future); - return future; - } + protected abstract RFuture addAsync(String requestQueueName, RemoteServiceRequest request, + RemotePromise result); - protected RFuture removeAsync(String requestQueueName, RequestId taskId) { - return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, - "redis.call('lrem', KEYS[1], 1, ARGV[1]); " - + "redis.call('hset', KEYS[2], ARGV[1]);" - + "return 1;", - Arrays.asList(requestQueueName, requestQueueName + ":tasks"), - taskId.toString()); - } + protected abstract RFuture removeAsync(String requestQueueName, RequestId taskId); private void cancelExecution(RemoteInvocationOptions optionsCopy, - boolean mayInterruptIfRunning, RemotePromise remotePromise, RFuture responseFuture) { + boolean mayInterruptIfRunning, RemotePromise remotePromise) { RMap canceledRequests = redisson.getMap(cancelRequestMapName, codec); canceledRequests.putAsync(remotePromise.getRequestId().toString(), new RemoteServiceCancelRequest(mayInterruptIfRunning, false)); canceledRequests.expireAsync(60, TimeUnit.SECONDS); @@ -768,7 +752,7 @@ public abstract class BaseRemoteService { if (!optionsCopy.isResultExpected()) { RemoteInvocationOptions options = new RemoteInvocationOptions(optionsCopy); options.expectResultWithin(60, TimeUnit.SECONDS); - responseFuture = poll(options.getExecutionTimeoutInMillis(), remotePromise.getRequestId(), false); + RFuture responseFuture = poll(options.getExecutionTimeoutInMillis(), remotePromise.getRequestId(), false); awaitResultAsync(options, remotePromise, responseFuture); } } diff --git a/redisson/src/main/java/org/redisson/RedissonRemoteService.java b/redisson/src/main/java/org/redisson/RedissonRemoteService.java index 0eafc41bf..287f783e9 100644 --- a/redisson/src/main/java/org/redisson/RedissonRemoteService.java +++ b/redisson/src/main/java/org/redisson/RedissonRemoteService.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.redisson.api.RBlockingQueue; import org.redisson.api.RBlockingQueueAsync; import org.redisson.api.RFuture; +import org.redisson.api.RList; import org.redisson.api.RMap; import org.redisson.api.RRemoteService; import org.redisson.api.RedissonClient; @@ -37,6 +38,7 @@ import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.codec.CompositeCodec; import org.redisson.command.CommandExecutor; +import org.redisson.executor.RemotePromise; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; import org.redisson.remote.RRemoteServiceResponse; @@ -71,6 +73,30 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS public RedissonRemoteService(Codec codec, RedissonClient redisson, String name, CommandExecutor commandExecutor, String executorId, ConcurrentMap responses) { super(codec, redisson, name, commandExecutor, executorId, responses); } + + @Override + protected RFuture addAsync(String requestQueueName, RemoteServiceRequest request, + RemotePromise result) { + RFuture future = commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "redis.call('hset', KEYS[2], ARGV[1], ARGV[2]);" + + "redis.call('rpush', KEYS[1], ARGV[1]); " + + "return 1;", + Arrays.asList(requestQueueName, requestQueueName + ":tasks"), + request.getId(), encode(request)); + + result.setAddFuture(future); + return future; + } + + @Override + protected RFuture removeAsync(String requestQueueName, RequestId taskId) { + return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "redis.call('lrem', KEYS[1], 1, ARGV[1]); " + + "redis.call('hset', KEYS[2], ARGV[1]);" + + "return 1;", + Arrays.asList(requestQueueName, requestQueueName + ":tasks"), + taskId.toString()); + } @Override public void register(Class remoteInterface, T object) { @@ -177,10 +203,11 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS } final RemoteServiceRequest request = future.getNow(); + long elapsedTime = System.currentTimeMillis() - request.getDate(); // check the ack only if expected - if (request.getOptions().isAckExpected() && System.currentTimeMillis() - request.getDate() > request + if (request.getOptions().isAckExpected() && elapsedTime > request .getOptions().getAckTimeoutInMillis()) { - log.debug("request: {} has been skipped due to ackTimeout"); + log.debug("request: {} has been skipped due to ackTimeout. Elapsed time: {}ms", request.getId(), elapsedTime); // re-subscribe after a skipped ackTimeout subscribe(remoteInterface, requestQueue, executor); return; @@ -194,14 +221,16 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS RFuture ackClientsFuture = commandExecutor.evalWriteAsync(responseName, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if redis.call('setnx', KEYS[1], 1) == 1 then " - + "redis.call('pexpire', KEYS[1], ARGV[2]);" - + "redis.call('rpush', KEYS[2], ARGV[1]);" + + "redis.call('pexpire', KEYS[1], ARGV[1]);" +// + "redis.call('rpush', KEYS[2], ARGV[1]);" // + "redis.call('pexpire', KEYS[2], ARGV[2]);" + "return 1;" + "end;" + "return 0;", - Arrays.asList(ackName, responseName), - encode(new RemoteServiceAck(request.getId())), request.getOptions().getAckTimeoutInMillis()); + Arrays.asList(ackName), + request.getOptions().getAckTimeoutInMillis()); +// Arrays.asList(ackName, responseName), +// encode(new RemoteServiceAck(request.getId())), request.getOptions().getAckTimeoutInMillis()); ackClientsFuture.addListener(new FutureListener() { @Override @@ -220,8 +249,32 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS subscribe(remoteInterface, requestQueue, executor); return; } + + + RList list = redisson.getList(responseName, codec); + RFuture addFuture = list.addAsync(new RemoteServiceAck(request.getId())); + addFuture.addListener(new FutureListener() { - executeMethod(remoteInterface, requestQueue, executor, request); + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + if (future.cause() instanceof RedissonShutdownException) { + return; + } + log.error("Can't send ack for request: " + request, future.cause()); + // re-subscribe after a failed send (ack) + subscribe(remoteInterface, requestQueue, executor); + return; + } + + if (!future.getNow()) { + subscribe(remoteInterface, requestQueue, executor); + return; + } + + executeMethod(remoteInterface, requestQueue, executor, request); + } + }); } }); } else {