Feature - tryExecute and tryExecuteAsync methods added to RRemoteService object. #2525

pull/2533/head
Nikita Koksharov 5 years ago
parent ae7d92db03
commit 5264cd16f3

@ -15,22 +15,7 @@
*/
package org.redisson;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RBlockingQueueAsync;
import org.redisson.api.RFuture;
import org.redisson.api.RList;
import org.redisson.api.RMap;
import org.redisson.api.RRemoteService;
import org.redisson.api.*;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
@ -39,20 +24,18 @@ import org.redisson.command.CommandAsyncExecutor;
import org.redisson.executor.RemotePromise;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.remote.BaseRemoteService;
import org.redisson.remote.RRemoteServiceResponse;
import org.redisson.remote.RemoteServiceAck;
import org.redisson.remote.RemoteServiceCancelRequest;
import org.redisson.remote.RemoteServiceCancelResponse;
import org.redisson.remote.RemoteServiceKey;
import org.redisson.remote.RemoteServiceMethod;
import org.redisson.remote.RemoteServiceRequest;
import org.redisson.remote.RemoteServiceResponse;
import org.redisson.remote.RequestId;
import org.redisson.remote.ResponseEntry;
import org.redisson.remote.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
* @author Nikita Koksharov
@ -85,13 +68,17 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
private static final Logger log = LoggerFactory.getLogger(RedissonRemoteService.class);
private final Map<RemoteServiceKey, RemoteServiceMethod> beans = new ConcurrentHashMap<>();
private final Map<Class<?>, Entry> remoteMap = new ConcurrentHashMap<>();
public RedissonRemoteService(Codec codec, String name, CommandAsyncExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, name, commandExecutor, executorId, responses);
}
public String getRequestTasksMapName(Class<?> remoteInterface) {
String queue = getRequestQueueName(remoteInterface);
return queue + ":tasks";
}
@Override
protected RFuture<Boolean> addAsync(String requestQueueName, RemoteServiceRequest request,
RemotePromise<Object> result) {
@ -109,9 +96,11 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
@Override
protected RFuture<Boolean> removeAsync(String requestQueueName, RequestId taskId) {
return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('lrem', KEYS[1], 1, ARGV[1]); "
+ "redis.call('hdel', KEYS[2], ARGV[1]);"
+ "return 1;",
"if redis.call('lrem', KEYS[1], 1, ARGV[1]) > 0 then "
+ "redis.call('hdel', KEYS[2], ARGV[1]);" +
"return 1;" +
"end;"
+ "return 0;",
Arrays.<Object>asList(requestQueueName, requestQueueName + ":tasks"),
taskId.toString());
}
@ -123,11 +112,6 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
@Override
public <T> void deregister(Class<T> remoteInterface) {
for (Method method : remoteInterface.getMethods()) {
RemoteServiceKey key = new RemoteServiceKey(remoteInterface, method.getName(), getMethodSignature(method));
beans.remove(key);
}
Entry entry = remoteMap.remove(remoteInterface);
if (entry != null && entry.getFuture() != null) {
entry.getFuture().cancel(false);
@ -164,23 +148,87 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
if (workers < 1) {
throw new IllegalArgumentException("executorsAmount can't be lower than 1");
}
for (Method method : remoteInterface.getMethods()) {
RemoteServiceMethod value = new RemoteServiceMethod(method, object);
RemoteServiceKey key = new RemoteServiceKey(remoteInterface, method.getName(), getMethodSignature(method));
if (beans.put(key, value) != null) {
if (remoteMap.putIfAbsent(remoteInterface, new Entry(workers)) != null) {
return;
}
String requestQueueName = getRequestQueueName(remoteInterface);
RBlockingQueue<String> requestQueue = getBlockingQueue(requestQueueName, StringCodec.INSTANCE);
subscribe(remoteInterface, requestQueue, executor, object);
}
remoteMap.put(remoteInterface, new Entry(workers));
@Override
public <T> boolean tryExecute(Class<T> remoteInterface, T object, long timeout, TimeUnit timeUnit) throws InterruptedException {
String requestQueueName = getRequestQueueName(remoteInterface);
RBlockingQueue<String> requestQueue = getBlockingQueue(requestQueueName, StringCodec.INSTANCE);
String requestId = requestQueue.poll(timeout, timeUnit);
if (requestId == null) {
return false;
}
RMap<String, RemoteServiceRequest> tasks = getMap(requestQueue.getName() + ":tasks");
RFuture<RemoteServiceRequest> taskFuture = getTask(requestId, tasks);
commandExecutor.getInterrupted(taskFuture);
RemoteServiceRequest request = taskFuture.getNow();
if (request == null) {
throw new IllegalStateException("Task can't be found for request: " + requestId);
}
RFuture<RRemoteServiceResponse> r = executeMethod(remoteInterface, requestQueue, commandExecutor.getConnectionManager().getExecutor(), request, object);
commandExecutor.getInterrupted(r);
return true;
}
@Override
public <T> RFuture<Boolean> tryExecuteAsync(Class<T> remoteInterface, T object) {
RPromise<Boolean> result = new RedissonPromise<>();
String requestQueueName = getRequestQueueName(remoteInterface);
RBlockingQueue<String> requestQueue = getBlockingQueue(requestQueueName, StringCodec.INSTANCE);
subscribe(remoteInterface, requestQueue, executor);
RFuture<String> poll = requestQueue.pollAsync();
poll.onComplete((requestId, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
if (requestId == null) {
result.trySuccess(false);
return;
}
RMap<String, RemoteServiceRequest> tasks = getMap(requestQueue.getName() + ":tasks");
RFuture<RemoteServiceRequest> taskFuture = getTask(requestId, tasks);
taskFuture.onComplete((request, exc) -> {
if (exc != null) {
result.tryFailure(exc);
return;
}
if (request == null) {
result.tryFailure(new IllegalStateException("Task can't be found for request: " + requestId));
return;
}
RFuture<RRemoteServiceResponse> future = executeMethod(remoteInterface, requestQueue, commandExecutor.getConnectionManager().getExecutor(), request, object);
future.onComplete((r, ex) -> {
if (ex != null) {
result.tryFailure(ex);
return;
}
result.trySuccess(true);
});
});
});
return result;
}
private <T> void subscribe(Class<T> remoteInterface, RBlockingQueue<String> requestQueue,
ExecutorService executor) {
ExecutorService executor, Object bean) {
Entry entry = remoteMap.get(remoteInterface);
if (entry == null) {
return;
@ -199,7 +247,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
}
log.error("Can't process the remote service request.", e);
// re-subscribe after a failed takeAsync
subscribe(remoteInterface, requestQueue, executor);
subscribe(remoteInterface, requestQueue, executor, bean);
return;
}
@ -212,7 +260,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
}
if (entry.getCounter().decrementAndGet() > 0) {
subscribe(remoteInterface, requestQueue, executor);
subscribe(remoteInterface, requestQueue, executor, bean);
}
RMap<String, RemoteServiceRequest> tasks = getMap(requestQueue.getName() + ":tasks");
@ -225,7 +273,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
log.error("Can't process the remote service request with id " + requestId, exc);
// re-subscribe after a failed takeAsync
resubscribe(remoteInterface, requestQueue, executor);
resubscribe(remoteInterface, requestQueue, executor, bean);
return;
}
@ -233,7 +281,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
log.debug("Task can't be found for request: {}", requestId);
// re-subscribe after a skipped ackTimeout
resubscribe(remoteInterface, requestQueue, executor);
resubscribe(remoteInterface, requestQueue, executor, bean);
return;
}
@ -244,7 +292,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
log.debug("request: {} has been skipped due to ackTimeout. Elapsed time: {}ms", request.getId(), elapsedTime);
// re-subscribe after a skipped ackTimeout
resubscribe(remoteInterface, requestQueue, executor);
resubscribe(remoteInterface, requestQueue, executor, bean);
return;
}
@ -275,12 +323,12 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
log.error("Can't send ack for request: " + request, ex);
// re-subscribe after a failed send (ack)
resubscribe(remoteInterface, requestQueue, executor);
resubscribe(remoteInterface, requestQueue, executor, bean);
return;
}
if (!r) {
resubscribe(remoteInterface, requestQueue, executor);
resubscribe(remoteInterface, requestQueue, executor, bean);
return;
}
@ -295,39 +343,42 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
log.error("Can't send ack for request: " + request, exce);
// re-subscribe after a failed send (ack)
resubscribe(remoteInterface, requestQueue, executor);
resubscribe(remoteInterface, requestQueue, executor, bean);
return;
}
if (!res) {
resubscribe(remoteInterface, requestQueue, executor);
resubscribe(remoteInterface, requestQueue, executor, bean);
return;
}
executeMethod(remoteInterface, requestQueue, executor, request);
executeMethod(remoteInterface, requestQueue, executor, request, bean);
});
});
} else {
executeMethod(remoteInterface, requestQueue, executor, request);
executeMethod(remoteInterface, requestQueue, executor, request, bean);
}
});
});
}
private <T> void executeMethod(Class<T> remoteInterface, RBlockingQueue<String> requestQueue,
ExecutorService executor, RemoteServiceRequest request) {
RemoteServiceMethod method = beans.get(new RemoteServiceKey(remoteInterface, request.getMethodName(), request.getSignature()));
String responseName = getResponseQueueName(request.getExecutorId());
private <T> RFuture<RRemoteServiceResponse> executeMethod(Class<T> remoteInterface, RBlockingQueue<String> requestQueue,
ExecutorService executor, RemoteServiceRequest request, Object bean) {
RemoteServiceMethod method = Arrays.stream(remoteInterface.getMethods())
.filter(m -> m.getName().equals(request.getMethodName())
&& Arrays.equals(getMethodSignature(m), request.getSignature()))
.map(m -> new RemoteServiceMethod(m, bean))
.findFirst().get();
AtomicReference<RRemoteServiceResponse> responseHolder = new AtomicReference<RRemoteServiceResponse>();
String responseName = getResponseQueueName(request.getExecutorId());
RPromise<RemoteServiceCancelRequest> cancelRequestFuture = new RedissonPromise<RemoteServiceCancelRequest>();
RPromise<RRemoteServiceResponse> responsePromise = new RedissonPromise<>();
RPromise<RemoteServiceCancelRequest> cancelRequestFuture = new RedissonPromise<>();
scheduleCheck(cancelRequestMapName, new RequestId(request.getId()), cancelRequestFuture);
java.util.concurrent.Future<?> submitFuture = executor.submit(() -> {
invokeMethod(remoteInterface, requestQueue, request, method, responseName, executor,
cancelRequestFuture, responseHolder);
cancelRequestFuture, responsePromise);
});
cancelRequestFuture.onComplete((r, e) -> {
@ -338,7 +389,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
boolean res = submitFuture.cancel(r.isMayInterruptIfRunning());
if (res) {
RemoteServiceCancelResponse response = new RemoteServiceCancelResponse(request.getId(), true);
if (!responseHolder.compareAndSet(null, response)) {
if (!responsePromise.trySuccess(response)) {
response = new RemoteServiceCancelResponse(request.getId(), false);
}
@ -350,20 +401,22 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
}
}
});
return responsePromise;
}
protected <T> void invokeMethod(Class<T> remoteInterface,
RBlockingQueue<String> requestQueue, RemoteServiceRequest request,
RemoteServiceMethod method, String responseName, ExecutorService executor,
RFuture<RemoteServiceCancelRequest> cancelRequestFuture, AtomicReference<RRemoteServiceResponse> responseHolder) {
RFuture<RemoteServiceCancelRequest> cancelRequestFuture, RPromise<RRemoteServiceResponse> responsePromise) {
try {
Object result = method.getMethod().invoke(method.getBean(), request.getArgs());
RemoteServiceResponse response = new RemoteServiceResponse(request.getId(), result);
responseHolder.compareAndSet(null, response);
responsePromise.trySuccess(response);
} catch (Exception e) {
RemoteServiceResponse response = new RemoteServiceResponse(request.getId(), e.getCause());
responseHolder.compareAndSet(null, response);
responsePromise.trySuccess(response);
log.error("Can't execute: " + request, e);
}
@ -373,7 +426,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
// send the response only if expected or task was canceled
if (request.getOptions().isResultExpected()
|| responseHolder.get() instanceof RemoteServiceCancelResponse) {
|| responsePromise.getNow() instanceof RemoteServiceCancelResponse) {
long timeout = 60 * 1000;
if (request.getOptions().getExecutionTimeoutInMillis() != null) {
timeout = request.getOptions().getExecutionTimeoutInMillis();
@ -381,7 +434,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
RBlockingQueueAsync<RRemoteServiceResponse> queue = getBlockingQueue(responseName, codec);
try {
RFuture<Void> clientsFuture = queue.putAsync(responseHolder.get());
RFuture<Void> clientsFuture = queue.putAsync(responsePromise.getNow());
queue.expireAsync(timeout, TimeUnit.MILLISECONDS);
clientsFuture.onComplete((res, e) -> {
@ -389,25 +442,25 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
if (e instanceof RedissonShutdownException) {
return;
}
log.error("Can't send response: " + responseHolder.get() + " for request: " + request, e);
log.error("Can't send response: " + responsePromise.getNow() + " for request: " + request, e);
}
resubscribe(remoteInterface, requestQueue, executor);
resubscribe(remoteInterface, requestQueue, executor, method.getBean());
});
} catch (Exception e) {
log.error("Can't send response: " + responseHolder.get() + " for request: " + request, e);
log.error("Can't send response: " + responsePromise.getNow() + " for request: " + request, e);
}
} else {
resubscribe(remoteInterface, requestQueue, executor);
resubscribe(remoteInterface, requestQueue, executor, method.getBean());
}
}
private <T> void resubscribe(Class<T> remoteInterface, RBlockingQueue<String> requestQueue,
ExecutorService executor) {
ExecutorService executor, Object bean) {
Entry entry = remoteMap.get(remoteInterface);
if (entry != null && entry.getCounter().getAndIncrement() == 0) {
// re-subscribe anyways after the method invocation
subscribe(remoteInterface, requestQueue, executor);
subscribe(remoteInterface, requestQueue, executor, bean);
}
}

@ -112,6 +112,32 @@ public interface RRemoteService {
*/
<T> void deregister(Class<T> remoteInterface);
/**
* Tries to execute one awaiting remote request.
* Waits up to <code>timeout</code> if necessary until remote request became available.
*
* @param remoteInterface - remote service interface
* @param object - remote service object
* @param timeout - maximum wait time until remote request became available
* @param timeUnit - time unit
* @param <T> - type of remote service
* @return <code>true</code> if method was successfully executed and
* <code>false</code> if timeout reached before execution
* @throws InterruptedException - if the thread is interrupted
*/
<T> boolean tryExecute(Class<T> remoteInterface, T object, long timeout, TimeUnit timeUnit) throws InterruptedException;
/**
* Tries to execute one awaiting remote request.
*
* @param remoteInterface - remote service interface
* @param object - remote service object
* @param <T> - type of remote service
* @return <code>true</code> if method was successfully executed and
* <code>false</code> if timeout reached before execution
*/
<T> RFuture<Boolean> tryExecuteAsync(Class<T> remoteInterface, T object);
/**
* Get remote service object for remote invocations.
* <p>

@ -17,7 +17,9 @@ package org.redisson.executor;
import org.redisson.RedissonExecutorService;
import org.redisson.RedissonRemoteService;
import org.redisson.api.*;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RFuture;
import org.redisson.api.RMap;
import org.redisson.api.executor.*;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
@ -30,7 +32,6 @@ import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
@ -86,7 +87,9 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService {
}
@Override
protected <T> void invokeMethod(Class<T> remoteInterface, RBlockingQueue<String> requestQueue, RemoteServiceRequest request, RemoteServiceMethod method, String responseName, ExecutorService executor, RFuture<RemoteServiceCancelRequest> cancelRequestFuture, AtomicReference<RRemoteServiceResponse> responseHolder) {
protected <T> void invokeMethod(Class<T> remoteInterface, RBlockingQueue<String> requestQueue, RemoteServiceRequest request,
RemoteServiceMethod method, String responseName, ExecutorService executor, RFuture<RemoteServiceCancelRequest> cancelRequestFuture,
RPromise<RRemoteServiceResponse> responsePromise) {
startedListeners.stream().forEach(l -> l.onStarted(request.getId()));
if (taskTimeout > 0) {
@ -94,10 +97,10 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService {
((RPromise) cancelRequestFuture).trySuccess(new RemoteServiceCancelRequest(true, false));
}, taskTimeout, TimeUnit.MILLISECONDS);
}
super.invokeMethod(remoteInterface, requestQueue, request, method, responseName, executor, cancelRequestFuture, responseHolder);
super.invokeMethod(remoteInterface, requestQueue, request, method, responseName, executor, cancelRequestFuture, responsePromise);
if (responseHolder.get() instanceof RemoteServiceResponse) {
RemoteServiceResponse response = (RemoteServiceResponse) responseHolder.get();
if (responsePromise.getNow() instanceof RemoteServiceResponse) {
RemoteServiceResponse response = (RemoteServiceResponse) responsePromise.getNow();
if (response.getError() == null) {
successListeners.stream().forEach(l -> l.onSucceeded(request.getId(), response.getResult()));
} else {

Loading…
Cancel
Save