refactoring

pull/2563/head
Nikita Koksharov 5 years ago
parent 721d01c337
commit ec153b891c

@ -167,6 +167,11 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
@Override
public <T> boolean tryExecute(Class<T> remoteInterface, T object, long timeout, TimeUnit timeUnit) throws InterruptedException {
return tryExecute(remoteInterface, object, commandExecutor.getConnectionManager().getExecutor(), timeout, timeUnit);
}
@Override
public <T> boolean tryExecute(Class<T> remoteInterface, T object, ExecutorService executorService, long timeout, TimeUnit timeUnit) throws InterruptedException {
String requestQueueName = getRequestQueueName(remoteInterface);
RBlockingQueue<String> requestQueue = getBlockingQueue(requestQueueName, StringCodec.INSTANCE);
@ -184,13 +189,18 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
throw new IllegalStateException("Task can't be found for request: " + requestId);
}
RFuture<RRemoteServiceResponse> r = executeMethod(remoteInterface, requestQueue, commandExecutor.getConnectionManager().getExecutor(), request, object);
RFuture<RRemoteServiceResponse> r = executeMethod(remoteInterface, requestQueue, executorService, request, object);
commandExecutor.getInterrupted(r);
return true;
}
@Override
public <T> RFuture<Boolean> tryExecuteAsync(Class<T> remoteInterface, T object, long timeout, TimeUnit timeUnit) {
return tryExecuteAsync(remoteInterface, object, commandExecutor.getConnectionManager().getExecutor(), timeout, timeUnit);
}
@Override
public <T> RFuture<Boolean> tryExecuteAsync(Class<T> remoteInterface, T object, ExecutorService executor, long timeout, TimeUnit timeUnit) {
RPromise<Boolean> result = new RedissonPromise<>();
result.setUncancellable();
String requestQueueName = getRequestQueueName(remoteInterface);
@ -226,7 +236,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
return;
}
RFuture<RRemoteServiceResponse> future = executeMethod(remoteInterface, requestQueue, commandExecutor.getConnectionManager().getExecutor(), request, object);
RFuture<RRemoteServiceResponse> future = executeMethod(remoteInterface, requestQueue, executor, request, object);
future.onComplete((r, ex) -> {
if (ex != null) {
result.tryFailure(ex);
@ -394,11 +404,42 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
RPromise<RemoteServiceCancelRequest> cancelRequestFuture = new RedissonPromise<>();
scheduleCheck(cancelRequestMapName, new RequestId(request.getId()), cancelRequestFuture);
responsePromise.onComplete((result, e) -> {
if (request.getOptions().isResultExpected()
|| result instanceof RemoteServiceCancelResponse) {
long timeout = 60 * 1000;
if (request.getOptions().getExecutionTimeoutInMillis() != null) {
timeout = request.getOptions().getExecutionTimeoutInMillis();
}
RBlockingQueueAsync<RRemoteServiceResponse> queue = getBlockingQueue(responseName, codec);
try {
RFuture<Void> clientsFuture = queue.putAsync(result);
queue.expireAsync(timeout, TimeUnit.MILLISECONDS);
clientsFuture.onComplete((res, exc) -> {
if (exc != null) {
if (exc instanceof RedissonShutdownException) {
return;
}
log.error("Can't send response: " + result + " for request: " + request, exc);
}
resubscribe(remoteInterface, requestQueue, executor, method.getBean());
});
} catch (Exception ex) {
log.error("Can't send response: " + result + " for request: " + request, e);
}
} else {
resubscribe(remoteInterface, requestQueue, executor, method.getBean());
}
});
java.util.concurrent.Future<?> submitFuture = executor.submit(() -> {
invokeMethod(remoteInterface, requestQueue, request, method, responseName, executor,
cancelRequestFuture, responsePromise);
invokeMethod(request, method, cancelRequestFuture, responsePromise);
});
cancelRequestFuture.onComplete((r, e) -> {
if (e != null) {
return;
@ -423,10 +464,8 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
return responsePromise;
}
protected <T> void invokeMethod(Class<T> remoteInterface,
RBlockingQueue<String> requestQueue, RemoteServiceRequest request,
RemoteServiceMethod method, String responseName, ExecutorService executor,
RFuture<RemoteServiceCancelRequest> cancelRequestFuture, RPromise<RRemoteServiceResponse> responsePromise) {
protected <T> void invokeMethod(RemoteServiceRequest request, RemoteServiceMethod method,
RFuture<RemoteServiceCancelRequest> cancelRequestFuture, RPromise<RRemoteServiceResponse> responsePromise) {
try {
Object result = method.getMethod().invoke(method.getBean(), request.getArgs());
@ -441,36 +480,6 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
if (cancelRequestFuture != null) {
cancelRequestFuture.cancel(false);
}
// send the response only if expected or task was canceled
if (request.getOptions().isResultExpected()
|| responsePromise.getNow() instanceof RemoteServiceCancelResponse) {
long timeout = 60 * 1000;
if (request.getOptions().getExecutionTimeoutInMillis() != null) {
timeout = request.getOptions().getExecutionTimeoutInMillis();
}
RBlockingQueueAsync<RRemoteServiceResponse> queue = getBlockingQueue(responseName, codec);
try {
RFuture<Void> clientsFuture = queue.putAsync(responsePromise.getNow());
queue.expireAsync(timeout, TimeUnit.MILLISECONDS);
clientsFuture.onComplete((res, e) -> {
if (e != null) {
if (e instanceof RedissonShutdownException) {
return;
}
log.error("Can't send response: " + responsePromise.getNow() + " for request: " + request, e);
}
resubscribe(remoteInterface, requestQueue, executor, method.getBean());
});
} catch (Exception e) {
log.error("Can't send response: " + responsePromise.getNow() + " for request: " + request, e);
}
} else {
resubscribe(remoteInterface, requestQueue, executor, method.getBean());
}
}
private <T> void resubscribe(Class<T> remoteInterface, RBlockingQueue<String> requestQueue,

@ -108,7 +108,7 @@ public interface RRemoteService {
* @param remoteInterface - remote service interface
* @param object - remote service object
* @param workers - workers amount
* @param executor - executor service
* @param executor - executor service used to invoke methods
*/
<T> void register(Class<T> remoteInterface, T object, int workers, ExecutorService executor);
@ -135,6 +135,22 @@ public interface RRemoteService {
*/
<T> boolean tryExecute(Class<T> remoteInterface, T object, long timeout, TimeUnit timeUnit) throws InterruptedException;
/**
* Tries to execute one awaiting remote request.
* Waits up to <code>timeout</code> if necessary until remote request became available.
*
* @param remoteInterface - remote service interface
* @param object - remote service object
* @param timeout - maximum wait time until remote request became available
* @param timeUnit - time unit
* @param executorService - executor service used to invoke methods
* @param <T> - type of remote service
* @return <code>true</code> if method was successfully executed and
* <code>false</code> if timeout reached before execution
* @throws InterruptedException - if the thread is interrupted
*/
<T> boolean tryExecute(Class<T> remoteInterface, T object, ExecutorService executorService, long timeout, TimeUnit timeUnit) throws InterruptedException;
/**
* Tries to execute one awaiting remote request.
*
@ -160,6 +176,21 @@ public interface RRemoteService {
*/
<T> RFuture<Boolean> tryExecuteAsync(Class<T> remoteInterface, T object, long timeout, TimeUnit timeUnit);
/**
* Tries to execute one awaiting remote request.
* Waits up to <code>timeout</code> if necessary until remote request became available.
*
* @param remoteInterface - remote service interface
* @param object - remote service object
* @param timeout - maximum wait time until remote request became available
* @param timeUnit - time unit
* @param executorService - executor service used to invoke methods
* @param <T> - type of remote service
* @return <code>true</code> if method was successfully executed and
* <code>false</code> if timeout reached before execution
*/
<T> RFuture<Boolean> tryExecuteAsync(Class<T> remoteInterface, T object, ExecutorService executorService, long timeout, TimeUnit timeUnit);
/**
* Get remote service object for remote invocations.
* <p>

@ -87,9 +87,8 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService {
}
@Override
protected <T> void invokeMethod(Class<T> remoteInterface, RBlockingQueue<String> requestQueue, RemoteServiceRequest request,
RemoteServiceMethod method, String responseName, ExecutorService executor, RFuture<RemoteServiceCancelRequest> cancelRequestFuture,
RPromise<RRemoteServiceResponse> responsePromise) {
protected <T> void invokeMethod(RemoteServiceRequest request, RemoteServiceMethod method,
RFuture<RemoteServiceCancelRequest> cancelRequestFuture, RPromise<RRemoteServiceResponse> responsePromise) {
startedListeners.stream().forEach(l -> l.onStarted(request.getId()));
if (taskTimeout > 0) {
@ -97,7 +96,7 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService {
((RPromise) cancelRequestFuture).trySuccess(new RemoteServiceCancelRequest(true, false));
}, taskTimeout, TimeUnit.MILLISECONDS);
}
super.invokeMethod(remoteInterface, requestQueue, request, method, responseName, executor, cancelRequestFuture, responsePromise);
super.invokeMethod(request, method, cancelRequestFuture, responsePromise);
if (responsePromise.getNow() instanceof RemoteServiceResponse) {
RemoteServiceResponse response = (RemoteServiceResponse) responsePromise.getNow();

Loading…
Cancel
Save