From 5264cd16f35f977c1ba1c796176267808f41906f Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Tue, 7 Jan 2020 12:38:35 +0300 Subject: [PATCH] Feature - tryExecute and tryExecuteAsync methods added to RRemoteService object. #2525 --- .../org/redisson/RedissonRemoteService.java | 211 +++++++++++------- .../java/org/redisson/api/RRemoteService.java | 28 ++- .../RedissonExecutorRemoteService.java | 15 +- 3 files changed, 168 insertions(+), 86 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonRemoteService.java b/redisson/src/main/java/org/redisson/RedissonRemoteService.java index 93dbcaa85..9a0830a72 100644 --- a/redisson/src/main/java/org/redisson/RedissonRemoteService.java +++ b/redisson/src/main/java/org/redisson/RedissonRemoteService.java @@ -15,22 +15,7 @@ */ package org.redisson; -import java.lang.reflect.Method; -import java.util.Arrays; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -import org.redisson.api.RBlockingQueue; -import org.redisson.api.RBlockingQueueAsync; -import org.redisson.api.RFuture; -import org.redisson.api.RList; -import org.redisson.api.RMap; -import org.redisson.api.RRemoteService; +import org.redisson.api.*; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.StringCodec; @@ -39,20 +24,18 @@ import org.redisson.command.CommandAsyncExecutor; import org.redisson.executor.RemotePromise; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; -import org.redisson.remote.BaseRemoteService; -import org.redisson.remote.RRemoteServiceResponse; -import org.redisson.remote.RemoteServiceAck; -import org.redisson.remote.RemoteServiceCancelRequest; -import org.redisson.remote.RemoteServiceCancelResponse; -import org.redisson.remote.RemoteServiceKey; -import org.redisson.remote.RemoteServiceMethod; -import org.redisson.remote.RemoteServiceRequest; -import org.redisson.remote.RemoteServiceResponse; -import org.redisson.remote.RequestId; -import org.redisson.remote.ResponseEntry; +import org.redisson.remote.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + /** * * @author Nikita Koksharov @@ -85,13 +68,17 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS private static final Logger log = LoggerFactory.getLogger(RedissonRemoteService.class); - private final Map beans = new ConcurrentHashMap<>(); private final Map, Entry> remoteMap = new ConcurrentHashMap<>(); public RedissonRemoteService(Codec codec, String name, CommandAsyncExecutor commandExecutor, String executorId, ConcurrentMap responses) { super(codec, name, commandExecutor, executorId, responses); } - + + public String getRequestTasksMapName(Class remoteInterface) { + String queue = getRequestQueueName(remoteInterface); + return queue + ":tasks"; + } + @Override protected RFuture addAsync(String requestQueueName, RemoteServiceRequest request, RemotePromise result) { @@ -109,9 +96,11 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS @Override protected RFuture removeAsync(String requestQueueName, RequestId taskId) { return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, - "redis.call('lrem', KEYS[1], 1, ARGV[1]); " - + "redis.call('hdel', KEYS[2], ARGV[1]);" - + "return 1;", + "if redis.call('lrem', KEYS[1], 1, ARGV[1]) > 0 then " + + "redis.call('hdel', KEYS[2], ARGV[1]);" + + "return 1;" + + "end;" + + "return 0;", Arrays.asList(requestQueueName, requestQueueName + ":tasks"), taskId.toString()); } @@ -123,11 +112,6 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS @Override public void deregister(Class remoteInterface) { - for (Method method : remoteInterface.getMethods()) { - RemoteServiceKey key = new RemoteServiceKey(remoteInterface, method.getName(), getMethodSignature(method)); - beans.remove(key); - } - Entry entry = remoteMap.remove(remoteInterface); if (entry != null && entry.getFuture() != null) { entry.getFuture().cancel(false); @@ -164,23 +148,87 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS if (workers < 1) { throw new IllegalArgumentException("executorsAmount can't be lower than 1"); } - for (Method method : remoteInterface.getMethods()) { - RemoteServiceMethod value = new RemoteServiceMethod(method, object); - RemoteServiceKey key = new RemoteServiceKey(remoteInterface, method.getName(), getMethodSignature(method)); - if (beans.put(key, value) != null) { - return; - } - } - remoteMap.put(remoteInterface, new Entry(workers)); + if (remoteMap.putIfAbsent(remoteInterface, new Entry(workers)) != null) { + return; + } String requestQueueName = getRequestQueueName(remoteInterface); RBlockingQueue requestQueue = getBlockingQueue(requestQueueName, StringCodec.INSTANCE); - subscribe(remoteInterface, requestQueue, executor); + subscribe(remoteInterface, requestQueue, executor, object); + } + + @Override + public boolean tryExecute(Class remoteInterface, T object, long timeout, TimeUnit timeUnit) throws InterruptedException { + String requestQueueName = getRequestQueueName(remoteInterface); + RBlockingQueue requestQueue = getBlockingQueue(requestQueueName, StringCodec.INSTANCE); + + String requestId = requestQueue.poll(timeout, timeUnit); + if (requestId == null) { + return false; + } + + RMap tasks = getMap(requestQueue.getName() + ":tasks"); + RFuture taskFuture = getTask(requestId, tasks); + commandExecutor.getInterrupted(taskFuture); + + RemoteServiceRequest request = taskFuture.getNow(); + if (request == null) { + throw new IllegalStateException("Task can't be found for request: " + requestId); + } + + RFuture r = executeMethod(remoteInterface, requestQueue, commandExecutor.getConnectionManager().getExecutor(), request, object); + commandExecutor.getInterrupted(r); + return true; + } + + @Override + public RFuture tryExecuteAsync(Class remoteInterface, T object) { + RPromise result = new RedissonPromise<>(); + String requestQueueName = getRequestQueueName(remoteInterface); + + RBlockingQueue requestQueue = getBlockingQueue(requestQueueName, StringCodec.INSTANCE); + RFuture poll = requestQueue.pollAsync(); + poll.onComplete((requestId, e) -> { + if (e != null) { + result.tryFailure(e); + return; + } + + if (requestId == null) { + result.trySuccess(false); + return; + } + + RMap tasks = getMap(requestQueue.getName() + ":tasks"); + RFuture taskFuture = getTask(requestId, tasks); + taskFuture.onComplete((request, exc) -> { + if (exc != null) { + result.tryFailure(exc); + return; + } + + if (request == null) { + result.tryFailure(new IllegalStateException("Task can't be found for request: " + requestId)); + return; + } + + RFuture future = executeMethod(remoteInterface, requestQueue, commandExecutor.getConnectionManager().getExecutor(), request, object); + future.onComplete((r, ex) -> { + if (ex != null) { + result.tryFailure(ex); + return; + } + + result.trySuccess(true); + }); + }); + }); + return result; } private void subscribe(Class remoteInterface, RBlockingQueue requestQueue, - ExecutorService executor) { + ExecutorService executor, Object bean) { Entry entry = remoteMap.get(remoteInterface); if (entry == null) { return; @@ -199,7 +247,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS } log.error("Can't process the remote service request.", e); // re-subscribe after a failed takeAsync - subscribe(remoteInterface, requestQueue, executor); + subscribe(remoteInterface, requestQueue, executor, bean); return; } @@ -212,7 +260,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS } if (entry.getCounter().decrementAndGet() > 0) { - subscribe(remoteInterface, requestQueue, executor); + subscribe(remoteInterface, requestQueue, executor, bean); } RMap tasks = getMap(requestQueue.getName() + ":tasks"); @@ -225,7 +273,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS log.error("Can't process the remote service request with id " + requestId, exc); // re-subscribe after a failed takeAsync - resubscribe(remoteInterface, requestQueue, executor); + resubscribe(remoteInterface, requestQueue, executor, bean); return; } @@ -233,7 +281,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS log.debug("Task can't be found for request: {}", requestId); // re-subscribe after a skipped ackTimeout - resubscribe(remoteInterface, requestQueue, executor); + resubscribe(remoteInterface, requestQueue, executor, bean); return; } @@ -244,7 +292,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS log.debug("request: {} has been skipped due to ackTimeout. Elapsed time: {}ms", request.getId(), elapsedTime); // re-subscribe after a skipped ackTimeout - resubscribe(remoteInterface, requestQueue, executor); + resubscribe(remoteInterface, requestQueue, executor, bean); return; } @@ -275,12 +323,12 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS log.error("Can't send ack for request: " + request, ex); // re-subscribe after a failed send (ack) - resubscribe(remoteInterface, requestQueue, executor); + resubscribe(remoteInterface, requestQueue, executor, bean); return; } if (!r) { - resubscribe(remoteInterface, requestQueue, executor); + resubscribe(remoteInterface, requestQueue, executor, bean); return; } @@ -295,39 +343,42 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS log.error("Can't send ack for request: " + request, exce); // re-subscribe after a failed send (ack) - resubscribe(remoteInterface, requestQueue, executor); + resubscribe(remoteInterface, requestQueue, executor, bean); return; } if (!res) { - resubscribe(remoteInterface, requestQueue, executor); + resubscribe(remoteInterface, requestQueue, executor, bean); return; } - executeMethod(remoteInterface, requestQueue, executor, request); + executeMethod(remoteInterface, requestQueue, executor, request, bean); }); }); } else { - executeMethod(remoteInterface, requestQueue, executor, request); + executeMethod(remoteInterface, requestQueue, executor, request, bean); } }); }); } - private void executeMethod(Class remoteInterface, RBlockingQueue requestQueue, - ExecutorService executor, RemoteServiceRequest request) { - RemoteServiceMethod method = beans.get(new RemoteServiceKey(remoteInterface, request.getMethodName(), request.getSignature())); + private RFuture executeMethod(Class remoteInterface, RBlockingQueue requestQueue, + ExecutorService executor, RemoteServiceRequest request, Object bean) { + RemoteServiceMethod method = Arrays.stream(remoteInterface.getMethods()) + .filter(m -> m.getName().equals(request.getMethodName()) + && Arrays.equals(getMethodSignature(m), request.getSignature())) + .map(m -> new RemoteServiceMethod(m, bean)) + .findFirst().get(); + String responseName = getResponseQueueName(request.getExecutorId()); - - AtomicReference responseHolder = new AtomicReference(); - - RPromise cancelRequestFuture = new RedissonPromise(); + RPromise responsePromise = new RedissonPromise<>(); + RPromise cancelRequestFuture = new RedissonPromise<>(); scheduleCheck(cancelRequestMapName, new RequestId(request.getId()), cancelRequestFuture); - + java.util.concurrent.Future submitFuture = executor.submit(() -> { invokeMethod(remoteInterface, requestQueue, request, method, responseName, executor, - cancelRequestFuture, responseHolder); + cancelRequestFuture, responsePromise); }); cancelRequestFuture.onComplete((r, e) -> { @@ -338,7 +389,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS boolean res = submitFuture.cancel(r.isMayInterruptIfRunning()); if (res) { RemoteServiceCancelResponse response = new RemoteServiceCancelResponse(request.getId(), true); - if (!responseHolder.compareAndSet(null, response)) { + if (!responsePromise.trySuccess(response)) { response = new RemoteServiceCancelResponse(request.getId(), false); } @@ -350,20 +401,22 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS } } }); + + return responsePromise; } protected void invokeMethod(Class remoteInterface, RBlockingQueue requestQueue, RemoteServiceRequest request, RemoteServiceMethod method, String responseName, ExecutorService executor, - RFuture cancelRequestFuture, AtomicReference responseHolder) { + RFuture cancelRequestFuture, RPromise responsePromise) { try { Object result = method.getMethod().invoke(method.getBean(), request.getArgs()); RemoteServiceResponse response = new RemoteServiceResponse(request.getId(), result); - responseHolder.compareAndSet(null, response); + responsePromise.trySuccess(response); } catch (Exception e) { RemoteServiceResponse response = new RemoteServiceResponse(request.getId(), e.getCause()); - responseHolder.compareAndSet(null, response); + responsePromise.trySuccess(response); log.error("Can't execute: " + request, e); } @@ -373,7 +426,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS // send the response only if expected or task was canceled if (request.getOptions().isResultExpected() - || responseHolder.get() instanceof RemoteServiceCancelResponse) { + || responsePromise.getNow() instanceof RemoteServiceCancelResponse) { long timeout = 60 * 1000; if (request.getOptions().getExecutionTimeoutInMillis() != null) { timeout = request.getOptions().getExecutionTimeoutInMillis(); @@ -381,7 +434,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS RBlockingQueueAsync queue = getBlockingQueue(responseName, codec); try { - RFuture clientsFuture = queue.putAsync(responseHolder.get()); + RFuture clientsFuture = queue.putAsync(responsePromise.getNow()); queue.expireAsync(timeout, TimeUnit.MILLISECONDS); clientsFuture.onComplete((res, e) -> { @@ -389,25 +442,25 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS if (e instanceof RedissonShutdownException) { return; } - log.error("Can't send response: " + responseHolder.get() + " for request: " + request, e); + log.error("Can't send response: " + responsePromise.getNow() + " for request: " + request, e); } - resubscribe(remoteInterface, requestQueue, executor); + resubscribe(remoteInterface, requestQueue, executor, method.getBean()); }); } catch (Exception e) { - log.error("Can't send response: " + responseHolder.get() + " for request: " + request, e); + log.error("Can't send response: " + responsePromise.getNow() + " for request: " + request, e); } } else { - resubscribe(remoteInterface, requestQueue, executor); + resubscribe(remoteInterface, requestQueue, executor, method.getBean()); } } private void resubscribe(Class remoteInterface, RBlockingQueue requestQueue, - ExecutorService executor) { + ExecutorService executor, Object bean) { Entry entry = remoteMap.get(remoteInterface); if (entry != null && entry.getCounter().getAndIncrement() == 0) { // re-subscribe anyways after the method invocation - subscribe(remoteInterface, requestQueue, executor); + subscribe(remoteInterface, requestQueue, executor, bean); } } diff --git a/redisson/src/main/java/org/redisson/api/RRemoteService.java b/redisson/src/main/java/org/redisson/api/RRemoteService.java index 6b5749cb7..1491e8e0a 100644 --- a/redisson/src/main/java/org/redisson/api/RRemoteService.java +++ b/redisson/src/main/java/org/redisson/api/RRemoteService.java @@ -111,7 +111,33 @@ public interface RRemoteService { * @param remoteInterface - remote service interface */ void deregister(Class remoteInterface); - + + /** + * Tries to execute one awaiting remote request. + * Waits up to timeout if necessary until remote request became available. + * + * @param remoteInterface - remote service interface + * @param object - remote service object + * @param timeout - maximum wait time until remote request became available + * @param timeUnit - time unit + * @param - type of remote service + * @return true if method was successfully executed and + * false if timeout reached before execution + * @throws InterruptedException - if the thread is interrupted + */ + boolean tryExecute(Class remoteInterface, T object, long timeout, TimeUnit timeUnit) throws InterruptedException; + + /** + * Tries to execute one awaiting remote request. + * + * @param remoteInterface - remote service interface + * @param object - remote service object + * @param - type of remote service + * @return true if method was successfully executed and + * false if timeout reached before execution + */ + RFuture tryExecuteAsync(Class remoteInterface, T object); + /** * Get remote service object for remote invocations. *

diff --git a/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java b/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java index 58680d1dd..c138c43c9 100644 --- a/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java +++ b/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java @@ -17,7 +17,9 @@ package org.redisson.executor; import org.redisson.RedissonExecutorService; import org.redisson.RedissonRemoteService; -import org.redisson.api.*; +import org.redisson.api.RBlockingQueue; +import org.redisson.api.RFuture; +import org.redisson.api.RMap; import org.redisson.api.executor.*; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; @@ -30,7 +32,6 @@ import java.util.List; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; /** @@ -86,7 +87,9 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService { } @Override - protected void invokeMethod(Class remoteInterface, RBlockingQueue requestQueue, RemoteServiceRequest request, RemoteServiceMethod method, String responseName, ExecutorService executor, RFuture cancelRequestFuture, AtomicReference responseHolder) { + protected void invokeMethod(Class remoteInterface, RBlockingQueue requestQueue, RemoteServiceRequest request, + RemoteServiceMethod method, String responseName, ExecutorService executor, RFuture cancelRequestFuture, + RPromise responsePromise) { startedListeners.stream().forEach(l -> l.onStarted(request.getId())); if (taskTimeout > 0) { @@ -94,10 +97,10 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService { ((RPromise) cancelRequestFuture).trySuccess(new RemoteServiceCancelRequest(true, false)); }, taskTimeout, TimeUnit.MILLISECONDS); } - super.invokeMethod(remoteInterface, requestQueue, request, method, responseName, executor, cancelRequestFuture, responseHolder); + super.invokeMethod(remoteInterface, requestQueue, request, method, responseName, executor, cancelRequestFuture, responsePromise); - if (responseHolder.get() instanceof RemoteServiceResponse) { - RemoteServiceResponse response = (RemoteServiceResponse) responseHolder.get(); + if (responsePromise.getNow() instanceof RemoteServiceResponse) { + RemoteServiceResponse response = (RemoteServiceResponse) responsePromise.getNow(); if (response.getError() == null) { successListeners.stream().forEach(l -> l.onSucceeded(request.getId(), response.getResult())); } else {