From 22f5f98631a2cdbf6ea3dc011a9327f3540cba19 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov <nkoksharov@redisson.pro> Date: Mon, 24 Jan 2022 11:55:37 +0300 Subject: [PATCH] refactoring --- .../org/redisson/RedissonRemoteService.java | 69 ++++++------------- .../RedissonExecutorRemoteService.java | 25 +++---- .../redisson/remote/BaseRemoteService.java | 5 +- 3 files changed, 37 insertions(+), 62 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonRemoteService.java b/redisson/src/main/java/org/redisson/RedissonRemoteService.java index 4763bf45c..1967cf12a 100644 --- a/redisson/src/main/java/org/redisson/RedissonRemoteService.java +++ b/redisson/src/main/java/org/redisson/RedissonRemoteService.java @@ -22,8 +22,7 @@ import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; import org.redisson.executor.RemotePromise; -import org.redisson.misc.RPromise; -import org.redisson.misc.RedissonPromise; +import org.redisson.misc.CompletableFutureWrapper; import org.redisson.remote.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -198,8 +197,6 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS @Override public <T> RFuture<Boolean> tryExecuteAsync(Class<T> remoteInterface, T object, ExecutorService executor, long timeout, TimeUnit timeUnit) { - RPromise<Boolean> result = new RedissonPromise<>(); - result.setUncancellable(); String requestQueueName = getRequestQueueName(remoteInterface); RBlockingQueue<String> requestQueue = getBlockingQueue(requestQueueName, StringCodec.INSTANCE); @@ -209,42 +206,23 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS } else { pollFuture = requestQueue.pollAsync(timeout, timeUnit); } - pollFuture.onComplete((requestId, e) -> { - if (e != null) { - result.tryFailure(e); - return; - } - + CompletionStage<Boolean> f = pollFuture.thenCompose(requestId -> { if (requestId == null) { - result.trySuccess(false); - return; + return CompletableFuture.completedFuture(false); } RMap<String, RemoteServiceRequest> tasks = getMap(((RedissonObject) requestQueue).getRawName() + ":tasks"); RFuture<RemoteServiceRequest> taskFuture = getTask(requestId, tasks); - taskFuture.onComplete((request, exc) -> { - if (exc != null) { - result.tryFailure(exc); - return; - } - + return taskFuture.thenCompose(request -> { if (request == null) { - result.tryFailure(new IllegalStateException("Task can't be found for request: " + requestId)); - return; + throw new CompletionException(new IllegalStateException("Task can't be found for request: " + requestId)); } RFuture<RRemoteServiceResponse> future = executeMethod(remoteInterface, requestQueue, executor, request, object); - future.onComplete((r, ex) -> { - if (ex != null) { - result.tryFailure(ex); - return; - } - - result.trySuccess(true); - }); + return future.thenApply(r -> true); }); }); - return result; + return new CompletableFutureWrapper<>(f); } @Override @@ -260,7 +238,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS } RFuture<String> take = requestQueue.pollAsync(60, TimeUnit.SECONDS); entry.setFuture(take); - take.onComplete((requestId, e) -> { + take.whenComplete((requestId, e) -> { Entry entr = remoteMap.get(remoteInterface); if (entr == null) { return; @@ -297,7 +275,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS RMap<String, RemoteServiceRequest> tasks = getMap(((RedissonObject) requestQueue).getRawName() + ":tasks"); RFuture<RemoteServiceRequest> taskFuture = getTask(requestId, tasks); - taskFuture.onComplete((request, exc) -> { + taskFuture.whenComplete((request, exc) -> { if (exc != null) { if (exc instanceof RedissonShutdownException) { return; @@ -347,7 +325,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS // Arrays.<Object>asList(ackName, responseName), // encode(new RemoteServiceAck(request.getId())), request.getOptions().getAckTimeoutInMillis()); - ackClientsFuture.onComplete((r, ex) -> { + ackClientsFuture.whenComplete((r, ex) -> { if (ex != null) { if (ex instanceof RedissonShutdownException) { return; @@ -367,7 +345,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS RList<Object> list = new RedissonList<>(codec, commandExecutor, responseName, null); RFuture<Boolean> addFuture = list.addAsync(new RemoteServiceAck(request.getId())); - addFuture.onComplete((res, exce) -> { + addFuture.whenComplete((res, exce) -> { if (exce != null) { if (exce instanceof RedissonShutdownException) { return; @@ -404,11 +382,11 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS String responseName = getResponseQueueName(request.getExecutorId()); - RPromise<RRemoteServiceResponse> responsePromise = new RedissonPromise<>(); - RPromise<RemoteServiceCancelRequest> cancelRequestFuture = new RedissonPromise<>(); + CompletableFuture<RRemoteServiceResponse> responsePromise = new CompletableFuture<>(); + CompletableFuture<RemoteServiceCancelRequest> cancelRequestFuture = new CompletableFuture<>(); scheduleCheck(cancelRequestMapName, new RequestId(request.getId()), cancelRequestFuture); - responsePromise.onComplete((result, e) -> { + responsePromise.whenComplete((result, e) -> { if (request.getOptions().isResultExpected() || result instanceof RemoteServiceCancelResponse) { @@ -430,7 +408,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS RFuture<Void> clientsFuture = queue.putAsync(response); queue.expireAsync(timeout, TimeUnit.MILLISECONDS); - clientsFuture.onComplete((res, exc) -> { + clientsFuture.whenComplete((res, exc) -> { if (exc != null) { if (exc instanceof RedissonShutdownException) { return; @@ -456,15 +434,11 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS invokeMethod(request, method, cancelRequestFuture, responsePromise); }); - cancelRequestFuture.onComplete((r, e) -> { - if (e != null) { - return; - } - + cancelRequestFuture.thenAccept(r -> { boolean res = submitFuture.cancel(r.isMayInterruptIfRunning()); if (res) { RemoteServiceCancelResponse response = new RemoteServiceCancelResponse(request.getId(), true); - if (!responsePromise.trySuccess(response)) { + if (!responsePromise.complete(response)) { response = new RemoteServiceCancelResponse(request.getId(), false); } @@ -477,19 +451,20 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS } }); - return responsePromise; + return new CompletableFutureWrapper<>(responsePromise); } protected <T> void invokeMethod(RemoteServiceRequest request, RemoteServiceMethod method, - RFuture<RemoteServiceCancelRequest> cancelRequestFuture, RPromise<RRemoteServiceResponse> responsePromise) { + CompletableFuture<RemoteServiceCancelRequest> cancelRequestFuture, + CompletableFuture<RRemoteServiceResponse> responsePromise) { try { Object result = method.getMethod().invoke(method.getBean(), request.getArgs()); RemoteServiceResponse response = new RemoteServiceResponse(request.getId(), result); - responsePromise.trySuccess(response); + responsePromise.complete(response); } catch (Exception e) { RemoteServiceResponse response = new RemoteServiceResponse(request.getId(), e.getCause()); - responsePromise.trySuccess(response); + responsePromise.complete(response); log.error("Can't execute: " + request, e); } diff --git a/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java b/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java index 3be5c1beb..36acbfed0 100644 --- a/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java +++ b/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java @@ -25,7 +25,6 @@ import org.redisson.api.executor.*; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; -import org.redisson.misc.RPromise; import org.redisson.remote.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +32,7 @@ import org.slf4j.LoggerFactory; import java.lang.reflect.InvocationTargetException; import java.util.Arrays; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -93,12 +93,13 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService { @Override protected <T> void invokeMethod(RemoteServiceRequest request, RemoteServiceMethod method, - RFuture<RemoteServiceCancelRequest> cancelRequestFuture, RPromise<RRemoteServiceResponse> responsePromise) { - startedListeners.stream().forEach(l -> l.onStarted(request.getId())); + CompletableFuture<RemoteServiceCancelRequest> cancelRequestFuture, + CompletableFuture<RRemoteServiceResponse> responsePromise) { + startedListeners.forEach(l -> l.onStarted(request.getId())); if (taskTimeout > 0) { commandExecutor.getConnectionManager().getGroup().schedule(() -> { - ((RPromise) cancelRequestFuture).trySuccess(new RemoteServiceCancelRequest(true, false)); + cancelRequestFuture.complete(new RemoteServiceCancelRequest(true, false)); }, taskTimeout, TimeUnit.MILLISECONDS); } @@ -106,7 +107,7 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService { Object result = method.getMethod().invoke(method.getBean(), request.getArgs()); RemoteServiceResponse response = new RemoteServiceResponse(request.getId(), result); - responsePromise.trySuccess(response); + responsePromise.complete(response); } catch (Exception e) { if (e instanceof InvocationTargetException && e.getCause() instanceof RedissonShutdownException) { @@ -116,7 +117,7 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService { return; } RemoteServiceResponse response = new RemoteServiceResponse(request.getId(), e.getCause()); - responsePromise.trySuccess(response); + responsePromise.complete(response); log.error("Can't execute: " + request, e); } @@ -124,18 +125,18 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService { cancelRequestFuture.cancel(false); } - if (responsePromise.getNow() instanceof RemoteServiceResponse) { - RemoteServiceResponse response = (RemoteServiceResponse) responsePromise.getNow(); + if (commandExecutor.getNow(responsePromise) instanceof RemoteServiceResponse) { + RemoteServiceResponse response = (RemoteServiceResponse) commandExecutor.getNow(responsePromise); if (response.getError() == null) { - successListeners.stream().forEach(l -> l.onSucceeded(request.getId(), response.getResult())); + successListeners.forEach(l -> l.onSucceeded(request.getId(), response.getResult())); } else { - failureListeners.stream().forEach(l -> l.onFailed(request.getId(), response.getError())); + failureListeners.forEach(l -> l.onFailed(request.getId(), response.getError())); } } else { - failureListeners.stream().forEach(l -> l.onFailed(request.getId(), null)); + failureListeners.forEach(l -> l.onFailed(request.getId(), null)); } - finishedListeners.stream().forEach(l -> l.onFinished(request.getId())); + finishedListeners.forEach(l -> l.onFinished(request.getId())); } public void setListeners(List<TaskListener> listeners) { diff --git a/redisson/src/main/java/org/redisson/remote/BaseRemoteService.java b/redisson/src/main/java/org/redisson/remote/BaseRemoteService.java index e6c3751cb..5f4863522 100644 --- a/redisson/src/main/java/org/redisson/remote/BaseRemoteService.java +++ b/redisson/src/main/java/org/redisson/remote/BaseRemoteService.java @@ -33,7 +33,6 @@ import org.redisson.codec.CompositeCodec; import org.redisson.command.CommandAsyncExecutor; import org.redisson.executor.RemotePromise; import org.redisson.misc.Hash; -import org.redisson.misc.RPromise; import java.io.IOException; import java.lang.annotation.Annotation; @@ -147,7 +146,7 @@ public abstract class BaseRemoteService { return new RedissonMap<>(new CompositeCodec(StringCodec.INSTANCE, codec, codec), commandExecutor, name, null, null, null); } - protected <T> void scheduleCheck(String mapName, RequestId requestId, RPromise<T> cancelRequest) { + protected <T> void scheduleCheck(String mapName, RequestId requestId, CompletableFuture<T> cancelRequest) { commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { @@ -169,7 +168,7 @@ public abstract class BaseRemoteService { if (request == null) { scheduleCheck(mapName, requestId, cancelRequest); } else { - cancelRequest.trySuccess(request); + cancelRequest.complete(request); } }); }