refactoring

pull/4091/head
Nikita Koksharov 3 years ago
parent b758cde7f0
commit 22f5f98631

@ -22,8 +22,7 @@ import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.executor.RemotePromise;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.remote.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -198,8 +197,6 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
@Override
public <T> RFuture<Boolean> tryExecuteAsync(Class<T> remoteInterface, T object, ExecutorService executor, long timeout, TimeUnit timeUnit) {
RPromise<Boolean> result = new RedissonPromise<>();
result.setUncancellable();
String requestQueueName = getRequestQueueName(remoteInterface);
RBlockingQueue<String> requestQueue = getBlockingQueue(requestQueueName, StringCodec.INSTANCE);
@ -209,42 +206,23 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
} else {
pollFuture = requestQueue.pollAsync(timeout, timeUnit);
}
pollFuture.onComplete((requestId, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
CompletionStage<Boolean> f = pollFuture.thenCompose(requestId -> {
if (requestId == null) {
result.trySuccess(false);
return;
return CompletableFuture.completedFuture(false);
}
RMap<String, RemoteServiceRequest> tasks = getMap(((RedissonObject) requestQueue).getRawName() + ":tasks");
RFuture<RemoteServiceRequest> taskFuture = getTask(requestId, tasks);
taskFuture.onComplete((request, exc) -> {
if (exc != null) {
result.tryFailure(exc);
return;
}
return taskFuture.thenCompose(request -> {
if (request == null) {
result.tryFailure(new IllegalStateException("Task can't be found for request: " + requestId));
return;
throw new CompletionException(new IllegalStateException("Task can't be found for request: " + requestId));
}
RFuture<RRemoteServiceResponse> future = executeMethod(remoteInterface, requestQueue, executor, request, object);
future.onComplete((r, ex) -> {
if (ex != null) {
result.tryFailure(ex);
return;
}
result.trySuccess(true);
});
return future.thenApply(r -> true);
});
});
return result;
return new CompletableFutureWrapper<>(f);
}
@Override
@ -260,7 +238,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
}
RFuture<String> take = requestQueue.pollAsync(60, TimeUnit.SECONDS);
entry.setFuture(take);
take.onComplete((requestId, e) -> {
take.whenComplete((requestId, e) -> {
Entry entr = remoteMap.get(remoteInterface);
if (entr == null) {
return;
@ -297,7 +275,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
RMap<String, RemoteServiceRequest> tasks = getMap(((RedissonObject) requestQueue).getRawName() + ":tasks");
RFuture<RemoteServiceRequest> taskFuture = getTask(requestId, tasks);
taskFuture.onComplete((request, exc) -> {
taskFuture.whenComplete((request, exc) -> {
if (exc != null) {
if (exc instanceof RedissonShutdownException) {
return;
@ -347,7 +325,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
// Arrays.<Object>asList(ackName, responseName),
// encode(new RemoteServiceAck(request.getId())), request.getOptions().getAckTimeoutInMillis());
ackClientsFuture.onComplete((r, ex) -> {
ackClientsFuture.whenComplete((r, ex) -> {
if (ex != null) {
if (ex instanceof RedissonShutdownException) {
return;
@ -367,7 +345,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
RList<Object> list = new RedissonList<>(codec, commandExecutor, responseName, null);
RFuture<Boolean> addFuture = list.addAsync(new RemoteServiceAck(request.getId()));
addFuture.onComplete((res, exce) -> {
addFuture.whenComplete((res, exce) -> {
if (exce != null) {
if (exce instanceof RedissonShutdownException) {
return;
@ -404,11 +382,11 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
String responseName = getResponseQueueName(request.getExecutorId());
RPromise<RRemoteServiceResponse> responsePromise = new RedissonPromise<>();
RPromise<RemoteServiceCancelRequest> cancelRequestFuture = new RedissonPromise<>();
CompletableFuture<RRemoteServiceResponse> responsePromise = new CompletableFuture<>();
CompletableFuture<RemoteServiceCancelRequest> cancelRequestFuture = new CompletableFuture<>();
scheduleCheck(cancelRequestMapName, new RequestId(request.getId()), cancelRequestFuture);
responsePromise.onComplete((result, e) -> {
responsePromise.whenComplete((result, e) -> {
if (request.getOptions().isResultExpected()
|| result instanceof RemoteServiceCancelResponse) {
@ -430,7 +408,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
RFuture<Void> clientsFuture = queue.putAsync(response);
queue.expireAsync(timeout, TimeUnit.MILLISECONDS);
clientsFuture.onComplete((res, exc) -> {
clientsFuture.whenComplete((res, exc) -> {
if (exc != null) {
if (exc instanceof RedissonShutdownException) {
return;
@ -456,15 +434,11 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
invokeMethod(request, method, cancelRequestFuture, responsePromise);
});
cancelRequestFuture.onComplete((r, e) -> {
if (e != null) {
return;
}
cancelRequestFuture.thenAccept(r -> {
boolean res = submitFuture.cancel(r.isMayInterruptIfRunning());
if (res) {
RemoteServiceCancelResponse response = new RemoteServiceCancelResponse(request.getId(), true);
if (!responsePromise.trySuccess(response)) {
if (!responsePromise.complete(response)) {
response = new RemoteServiceCancelResponse(request.getId(), false);
}
@ -477,19 +451,20 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
}
});
return responsePromise;
return new CompletableFutureWrapper<>(responsePromise);
}
protected <T> void invokeMethod(RemoteServiceRequest request, RemoteServiceMethod method,
RFuture<RemoteServiceCancelRequest> cancelRequestFuture, RPromise<RRemoteServiceResponse> responsePromise) {
CompletableFuture<RemoteServiceCancelRequest> cancelRequestFuture,
CompletableFuture<RRemoteServiceResponse> responsePromise) {
try {
Object result = method.getMethod().invoke(method.getBean(), request.getArgs());
RemoteServiceResponse response = new RemoteServiceResponse(request.getId(), result);
responsePromise.trySuccess(response);
responsePromise.complete(response);
} catch (Exception e) {
RemoteServiceResponse response = new RemoteServiceResponse(request.getId(), e.getCause());
responsePromise.trySuccess(response);
responsePromise.complete(response);
log.error("Can't execute: " + request, e);
}

@ -25,7 +25,6 @@ import org.redisson.api.executor.*;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RPromise;
import org.redisson.remote.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -33,6 +32,7 @@ import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -93,12 +93,13 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService {
@Override
protected <T> void invokeMethod(RemoteServiceRequest request, RemoteServiceMethod method,
RFuture<RemoteServiceCancelRequest> cancelRequestFuture, RPromise<RRemoteServiceResponse> responsePromise) {
startedListeners.stream().forEach(l -> l.onStarted(request.getId()));
CompletableFuture<RemoteServiceCancelRequest> cancelRequestFuture,
CompletableFuture<RRemoteServiceResponse> responsePromise) {
startedListeners.forEach(l -> l.onStarted(request.getId()));
if (taskTimeout > 0) {
commandExecutor.getConnectionManager().getGroup().schedule(() -> {
((RPromise) cancelRequestFuture).trySuccess(new RemoteServiceCancelRequest(true, false));
cancelRequestFuture.complete(new RemoteServiceCancelRequest(true, false));
}, taskTimeout, TimeUnit.MILLISECONDS);
}
@ -106,7 +107,7 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService {
Object result = method.getMethod().invoke(method.getBean(), request.getArgs());
RemoteServiceResponse response = new RemoteServiceResponse(request.getId(), result);
responsePromise.trySuccess(response);
responsePromise.complete(response);
} catch (Exception e) {
if (e instanceof InvocationTargetException
&& e.getCause() instanceof RedissonShutdownException) {
@ -116,7 +117,7 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService {
return;
}
RemoteServiceResponse response = new RemoteServiceResponse(request.getId(), e.getCause());
responsePromise.trySuccess(response);
responsePromise.complete(response);
log.error("Can't execute: " + request, e);
}
@ -124,18 +125,18 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService {
cancelRequestFuture.cancel(false);
}
if (responsePromise.getNow() instanceof RemoteServiceResponse) {
RemoteServiceResponse response = (RemoteServiceResponse) responsePromise.getNow();
if (commandExecutor.getNow(responsePromise) instanceof RemoteServiceResponse) {
RemoteServiceResponse response = (RemoteServiceResponse) commandExecutor.getNow(responsePromise);
if (response.getError() == null) {
successListeners.stream().forEach(l -> l.onSucceeded(request.getId(), response.getResult()));
successListeners.forEach(l -> l.onSucceeded(request.getId(), response.getResult()));
} else {
failureListeners.stream().forEach(l -> l.onFailed(request.getId(), response.getError()));
failureListeners.forEach(l -> l.onFailed(request.getId(), response.getError()));
}
} else {
failureListeners.stream().forEach(l -> l.onFailed(request.getId(), null));
failureListeners.forEach(l -> l.onFailed(request.getId(), null));
}
finishedListeners.stream().forEach(l -> l.onFinished(request.getId()));
finishedListeners.forEach(l -> l.onFinished(request.getId()));
}
public void setListeners(List<TaskListener> listeners) {

@ -33,7 +33,6 @@ import org.redisson.codec.CompositeCodec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.executor.RemotePromise;
import org.redisson.misc.Hash;
import org.redisson.misc.RPromise;
import java.io.IOException;
import java.lang.annotation.Annotation;
@ -147,7 +146,7 @@ public abstract class BaseRemoteService {
return new RedissonMap<>(new CompositeCodec(StringCodec.INSTANCE, codec, codec), commandExecutor, name, null, null, null);
}
protected <T> void scheduleCheck(String mapName, RequestId requestId, RPromise<T> cancelRequest) {
protected <T> void scheduleCheck(String mapName, RequestId requestId, CompletableFuture<T> cancelRequest) {
commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
@ -169,7 +168,7 @@ public abstract class BaseRemoteService {
if (request == null) {
scheduleCheck(mapName, requestId, cancelRequest);
} else {
cancelRequest.trySuccess(request);
cancelRequest.complete(request);
}
});
}

Loading…
Cancel
Save