From cc956fcb536d4dcce72897bac59ed16b4a7e1ec6 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Tue, 4 Jan 2022 11:02:20 +0300 Subject: [PATCH] refactoring --- .../redisson/cache/LocalCacheListener.java | 44 ++++---------- .../org/redisson/redisnode/RedisNode.java | 18 ++---- .../redisson/redisnode/SentinelRedisNode.java | 42 ++++++-------- .../org/redisson/remote/AsyncRemoteProxy.java | 21 +++---- .../org/redisson/remote/BaseRemoteProxy.java | 57 +++++++------------ .../redisson/remote/BaseRemoteService.java | 2 +- .../org/redisson/remote/ResponseEntry.java | 13 ++--- .../org/redisson/remote/SyncRemoteProxy.java | 10 ++-- 8 files changed, 75 insertions(+), 132 deletions(-) diff --git a/redisson/src/main/java/org/redisson/cache/LocalCacheListener.java b/redisson/src/main/java/org/redisson/cache/LocalCacheListener.java index c56f5fa2b..ff23ee399 100644 --- a/redisson/src/main/java/org/redisson/cache/LocalCacheListener.java +++ b/redisson/src/main/java/org/redisson/cache/LocalCacheListener.java @@ -29,17 +29,13 @@ import org.redisson.api.listener.MessageListener; import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.codec.Codec; import org.redisson.command.CommandAsyncExecutor; -import org.redisson.misc.RPromise; -import org.redisson.misc.RedissonPromise; +import org.redisson.misc.CompletableFutureWrapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * @@ -250,35 +246,17 @@ public abstract class LocalCacheListener { } public RFuture clearLocalCacheAsync() { - RPromise result = new RedissonPromise(); cache.clear(); byte[] id = generateId(); RFuture future = invalidationTopic.publishAsync(new LocalCachedMapClear(instanceId, id, true)); - future.onComplete((res, e) -> { - if (e != null) { - result.tryFailure(e); - return; - } - + CompletionStage f = future.thenCompose(res -> { RSemaphore semaphore = getClearSemaphore(id); - semaphore.tryAcquireAsync(res.intValue() - 1, 50, TimeUnit.SECONDS).onComplete((r, ex) -> { - if (ex != null) { - result.tryFailure(ex); - return; - } - - semaphore.deleteAsync().onComplete((re, exc) -> { - if (exc != null) { - result.tryFailure(exc); - return; - } - - result.trySuccess(null); - }); - }); + return semaphore.tryAcquireAsync(res.intValue() - 1, 50, TimeUnit.SECONDS) + .thenCompose(r -> { + return semaphore.deleteAsync().thenApply(re -> null); + }); }); - - return result; + return new CompletableFutureWrapper<>(f); } public RTopic getInvalidationTopic() { @@ -328,7 +306,7 @@ public abstract class LocalCacheListener { return; } - object.isExistsAsync().onComplete((res, e) -> { + object.isExistsAsync().whenComplete((res, e) -> { if (e != null) { log.error("Can't check existance", e); return; @@ -339,9 +317,9 @@ public abstract class LocalCacheListener { return; } - RScoredSortedSet logs = new RedissonScoredSortedSet(ByteArrayCodec.INSTANCE, commandExecutor, getUpdatesLogName(), null); + RScoredSortedSet logs = new RedissonScoredSortedSet<>(ByteArrayCodec.INSTANCE, commandExecutor, getUpdatesLogName(), null); logs.valueRangeAsync(lastInvalidate, true, Double.POSITIVE_INFINITY, true) - .onComplete((r, ex) -> { + .whenComplete((r, ex) -> { if (ex != null) { log.error("Can't load update log", ex); return; diff --git a/redisson/src/main/java/org/redisson/redisnode/RedisNode.java b/redisson/src/main/java/org/redisson/redisnode/RedisNode.java index 4cfbdd9ab..bdef61dd5 100644 --- a/redisson/src/main/java/org/redisson/redisnode/RedisNode.java +++ b/redisson/src/main/java/org/redisson/redisnode/RedisNode.java @@ -26,14 +26,14 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.Time; import org.redisson.cluster.ClusterSlotRange; import org.redisson.command.CommandAsyncExecutor; -import org.redisson.misc.RPromise; +import org.redisson.misc.CompletableFutureWrapper; import org.redisson.misc.RedisURI; -import org.redisson.misc.RedissonPromise; import java.net.InetSocketAddress; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; @@ -73,21 +73,13 @@ public class RedisNode implements RedisClusterMaster, RedisClusterSlave, RedisMa @Override public RFuture pingAsync(long timeout, TimeUnit timeUnit) { - RPromise result = new RedissonPromise<>(); RFuture f = commandExecutor.readAsync(client, null, RedisCommands.PING_BOOL); - f.onComplete((res, e) -> { - if (e != null) { - result.trySuccess(false); - return; - } - - result.trySuccess(res); - }); + CompletionStage s = f.exceptionally(e -> false); commandExecutor.getConnectionManager().newTimeout(t -> { RedisTimeoutException ex = new RedisTimeoutException("Command execution timeout (" + timeUnit.toMillis(timeout) + "ms) for command: PING, Redis client: " + client); - result.tryFailure(ex); + s.toCompletableFuture().completeExceptionally(ex); }, timeout, timeUnit); - return result; + return new CompletableFutureWrapper<>(s); } @Override diff --git a/redisson/src/main/java/org/redisson/redisnode/SentinelRedisNode.java b/redisson/src/main/java/org/redisson/redisnode/SentinelRedisNode.java index fef568a0d..fdc3cb9fb 100644 --- a/redisson/src/main/java/org/redisson/redisnode/SentinelRedisNode.java +++ b/redisson/src/main/java/org/redisson/redisnode/SentinelRedisNode.java @@ -27,13 +27,14 @@ import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.Time; import org.redisson.command.CommandAsyncExecutor; -import org.redisson.misc.RPromise; +import org.redisson.misc.CompletableFutureWrapper; import org.redisson.misc.RedisURI; -import org.redisson.misc.RedissonPromise; import java.net.InetSocketAddress; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; @@ -119,35 +120,24 @@ public class SentinelRedisNode implements RedisSentinel, RedisSentinelAsync { } private RFuture executeAsync(T defaultValue, Codec codec, long timeout, RedisCommand command, Object... params) { - RPromise result = new RedissonPromise<>(); - CompletionStage connectionFuture = client.connectAsync(); - connectionFuture.whenComplete((connection, ex) -> { - if (ex != null) { - if (defaultValue != null) { - result.trySuccess(defaultValue); - } else { - result.tryFailure(ex); - } - return; + CompletableFuture connectionFuture = client.connectAsync().toCompletableFuture(); + CompletableFuture f = connectionFuture.thenCompose(connection -> { + return connection.async(timeout, codec, command, params); + }).handle((r, e) -> { + if (connectionFuture.isDone() && !connectionFuture.isCompletedExceptionally()) { + connectionFuture.getNow(null).closeAsync(); } - RFuture future = connection.async(timeout, codec, command, params); - future.onComplete((r, e) -> { - connection.closeAsync(); - - if (e != null) { - if (defaultValue != null) { - result.trySuccess(defaultValue); - } else { - result.tryFailure(e); - } - return; + if (e != null) { + if (defaultValue != null) { + return defaultValue; } + throw new CompletionException(e); + } - result.trySuccess(r); - }); + return r; }); - return result; + return new CompletableFutureWrapper((CompletionStage) f); } @Override diff --git a/redisson/src/main/java/org/redisson/remote/AsyncRemoteProxy.java b/redisson/src/main/java/org/redisson/remote/AsyncRemoteProxy.java index 0168839b2..5ddcd0ace 100644 --- a/redisson/src/main/java/org/redisson/remote/AsyncRemoteProxy.java +++ b/redisson/src/main/java/org/redisson/remote/AsyncRemoteProxy.java @@ -40,6 +40,7 @@ import java.lang.reflect.Proxy; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -115,14 +116,14 @@ public class AsyncRemoteProxy extends BaseRemoteProxy { RemoteServiceRequest request = new RemoteServiceRequest(executorId, requestId.toString(), method.getName(), remoteService.getMethodSignature(method), args, optionsCopy, System.currentTimeMillis()); - final RFuture ackFuture; + CompletableFuture ackFuture; if (optionsCopy.isAckExpected()) { ackFuture = pollResponse(optionsCopy.getAckTimeoutInMillis(), requestId, false); } else { ackFuture = null; } - final RPromise responseFuture; + CompletableFuture responseFuture; if (optionsCopy.isResultExpected()) { long timeout = remoteService.getTimeout(optionsCopy.getExecutionTimeoutInMillis(), request); responseFuture = pollResponse(timeout, requestId, false); @@ -157,7 +158,7 @@ public class AsyncRemoteProxy extends BaseRemoteProxy { } if (optionsCopy.isAckExpected()) { - ackFuture.onComplete((ack, ex) -> { + ackFuture.whenComplete((ack, ex) -> { if (ex != null) { if (responseFuture != null) { responseFuture.cancel(false); @@ -169,9 +170,9 @@ public class AsyncRemoteProxy extends BaseRemoteProxy { if (ack == null) { String ackName = remoteService.getAckName(requestId); - RFuture ackFutureAttempt = + CompletionStage ackFutureAttempt = tryPollAckAgainAsync(optionsCopy, ackName, requestId); - ackFutureAttempt.onComplete((re, ex2) -> { + ackFutureAttempt.whenComplete((re, ex2) -> { if (ex2 != null) { result.completeExceptionally(ex2); return; @@ -209,9 +210,9 @@ public class AsyncRemoteProxy extends BaseRemoteProxy { } private void awaitResultAsync(RemoteInvocationOptions optionsCopy, RemotePromise result, - String ackName, RFuture responseFuture) { + String ackName, CompletableFuture responseFuture) { RFuture deleteFuture = new RedissonBucket<>(commandExecutor, ackName).deleteAsync(); - deleteFuture.onComplete((res, e) -> { + deleteFuture.whenComplete((res, e) -> { if (e != null) { result.completeExceptionally(e); return; @@ -222,13 +223,13 @@ public class AsyncRemoteProxy extends BaseRemoteProxy { } protected void awaitResultAsync(RemoteInvocationOptions optionsCopy, RemotePromise result, - RFuture responseFuture) { + CompletionStage responseFuture) { // poll for the response only if expected if (!optionsCopy.isResultExpected()) { return; } - responseFuture.onComplete((res, e) -> { + responseFuture.whenComplete((res, e) -> { if (e != null) { result.completeExceptionally(e); return; @@ -408,7 +409,7 @@ public class AsyncRemoteProxy extends BaseRemoteProxy { if (!optionsCopy.isResultExpected()) { RemoteInvocationOptions options = new RemoteInvocationOptions(optionsCopy); options.expectResultWithin(60, TimeUnit.SECONDS); - RFuture responseFuture = pollResponse(options.getExecutionTimeoutInMillis(), remotePromise.getRequestId(), false); + CompletionStage responseFuture = pollResponse(options.getExecutionTimeoutInMillis(), remotePromise.getRequestId(), false); awaitResultAsync(options, remotePromise, responseFuture); } } diff --git a/redisson/src/main/java/org/redisson/remote/BaseRemoteProxy.java b/redisson/src/main/java/org/redisson/remote/BaseRemoteProxy.java index 9df4f5e5a..547260148 100644 --- a/redisson/src/main/java/org/redisson/remote/BaseRemoteProxy.java +++ b/redisson/src/main/java/org/redisson/remote/BaseRemoteProxy.java @@ -24,13 +24,13 @@ import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; -import org.redisson.misc.RPromise; -import org.redisson.misc.RedissonPromise; import org.redisson.remote.ResponseEntry.Result; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; @@ -70,9 +70,8 @@ public abstract class BaseRemoteProxy { return requestQueueNameCache.computeIfAbsent(remoteInterface, k -> "{" + name + ":" + k.getName() + "}"); } - protected RFuture tryPollAckAgainAsync(RemoteInvocationOptions optionsCopy, - String ackName, RequestId requestId) { - RPromise promise = new RedissonPromise<>(); + protected CompletionStage tryPollAckAgainAsync(RemoteInvocationOptions optionsCopy, + String ackName, RequestId requestId) { RFuture ackClientsFuture = commandExecutor.evalWriteNoRetryAsync(ackName, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if redis.call('setnx', KEYS[1], 1) == 1 then " + "redis.call('pexpire', KEYS[1], ARGV[1]);" @@ -80,33 +79,18 @@ public abstract class BaseRemoteProxy { + "end;" + "redis.call('del', KEYS[1]);" + "return 1;", - Arrays. asList(ackName), optionsCopy.getAckTimeoutInMillis()); - ackClientsFuture.onComplete((res, e) -> { - if (e != null) { - promise.tryFailure(e); - return; - } - + Arrays.asList(ackName), optionsCopy.getAckTimeoutInMillis()); + return ackClientsFuture.thenCompose(res -> { if (res) { - RPromise ackFuture = pollResponse(commandExecutor.getConnectionManager().getConfig().getTimeout(), requestId, true); - ackFuture.onComplete((r, ex) -> { - if (ex != null) { - promise.tryFailure(ex); - return; - } - - promise.trySuccess(r); - }); - } else { - promise.trySuccess(null); + return pollResponse(commandExecutor.getConnectionManager().getConfig().getTimeout(), requestId, true); } + return CompletableFuture.completedFuture(null); }); - return promise; } - protected final RPromise pollResponse(long timeout, + protected final CompletableFuture pollResponse(long timeout, RequestId requestId, boolean insertFirst) { - RPromise responseFuture = new RedissonPromise(); + CompletableFuture responseFuture = new CompletableFuture(); ResponseEntry entry; synchronized (responses) { @@ -125,13 +109,14 @@ public abstract class BaseRemoteProxy { } else { list.add(res); } + } pollResponse(entry); return responseFuture; } - private ScheduledFuture createResponseTimeout(long timeout, RequestId requestId, RPromise responseFuture) { + private ScheduledFuture createResponseTimeout(long timeout, RequestId requestId, CompletableFuture responseFuture) { return commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() { @Override public void run() { @@ -142,7 +127,7 @@ public abstract class BaseRemoteProxy { } RemoteServiceTimeoutException ex = new RemoteServiceTimeoutException("No response after " + timeout + "ms"); - if (!responseFuture.tryFailure(ex)) { + if (!responseFuture.completeExceptionally(ex)) { return; } @@ -159,8 +144,8 @@ public abstract class BaseRemoteProxy { }, timeout, TimeUnit.MILLISECONDS); } - private void addCancelHandling(RequestId requestId, RPromise responseFuture) { - responseFuture.onComplete((res, ex) -> { + private void addCancelHandling(RequestId requestId, CompletableFuture responseFuture) { + responseFuture.whenComplete((res, ex) -> { if (!responseFuture.isCancelled()) { return; } @@ -201,7 +186,7 @@ public abstract class BaseRemoteProxy { RBlockingQueue queue = getBlockingQueue(responseQueueName, codec); RFuture future = queue.takeAsync(); - future.onComplete(createResponseListener()); + future.whenComplete(createResponseListener()); } private BiConsumer createResponseListener() { @@ -210,8 +195,8 @@ public abstract class BaseRemoteProxy { log.error("Can't get response from " + responseQueueName, e); return; } - - RPromise promise; + + CompletableFuture promise; synchronized (responses) { ResponseEntry entry = responses.get(responseQueueName); if (entry == null) { @@ -222,7 +207,7 @@ public abstract class BaseRemoteProxy { List list = entry.getResponses().get(key); if (list == null) { RBlockingQueue responseQueue = getBlockingQueue(responseQueueName, codec); - responseQueue.takeAsync().onComplete(createResponseListener()); + responseQueue.takeAsync().whenComplete(createResponseListener()); return; } @@ -238,12 +223,12 @@ public abstract class BaseRemoteProxy { responses.remove(responseQueueName, entry); } else { RBlockingQueue responseQueue = getBlockingQueue(responseQueueName, codec); - responseQueue.takeAsync().onComplete(createResponseListener()); + responseQueue.takeAsync().whenComplete(createResponseListener()); } } if (promise != null) { - promise.trySuccess(response); + promise.complete(response); } }; } diff --git a/redisson/src/main/java/org/redisson/remote/BaseRemoteService.java b/redisson/src/main/java/org/redisson/remote/BaseRemoteService.java index d69c488d6..e6c3751cb 100644 --- a/redisson/src/main/java/org/redisson/remote/BaseRemoteService.java +++ b/redisson/src/main/java/org/redisson/remote/BaseRemoteService.java @@ -157,7 +157,7 @@ public abstract class BaseRemoteService { RMap canceledRequests = getMap(mapName); RFuture future = canceledRequests.removeAsync(requestId.toString()); - future.onComplete((request, ex) -> { + future.whenComplete((request, ex) -> { if (cancelRequest.isDone()) { return; } diff --git a/redisson/src/main/java/org/redisson/remote/ResponseEntry.java b/redisson/src/main/java/org/redisson/remote/ResponseEntry.java index a94e82706..cd0ac210b 100644 --- a/redisson/src/main/java/org/redisson/remote/ResponseEntry.java +++ b/redisson/src/main/java/org/redisson/remote/ResponseEntry.java @@ -18,11 +18,10 @@ package org.redisson.remote; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; -import org.redisson.misc.RPromise; - /** * * @author Nikita Koksharov @@ -31,18 +30,18 @@ import org.redisson.misc.RPromise; public class ResponseEntry { public static class Result { - - private final RPromise promise; + + private final CompletableFuture promise; private final ScheduledFuture responseTimeoutFuture; - public Result(RPromise promise, ScheduledFuture responseTimeoutFuture) { + public Result(CompletableFuture promise, ScheduledFuture responseTimeoutFuture) { super(); this.promise = promise; this.responseTimeoutFuture = responseTimeoutFuture; } - public RPromise getPromise() { - return (RPromise) promise; + public CompletableFuture getPromise() { + return (CompletableFuture) promise; } public ScheduledFuture getResponseTimeoutFuture() { diff --git a/redisson/src/main/java/org/redisson/remote/SyncRemoteProxy.java b/redisson/src/main/java/org/redisson/remote/SyncRemoteProxy.java index de365c685..42d4831c9 100644 --- a/redisson/src/main/java/org/redisson/remote/SyncRemoteProxy.java +++ b/redisson/src/main/java/org/redisson/remote/SyncRemoteProxy.java @@ -16,13 +16,11 @@ package org.redisson.remote; import org.redisson.RedissonBucket; -import org.redisson.api.RFuture; import org.redisson.api.RemoteInvocationOptions; import org.redisson.client.RedisException; import org.redisson.client.codec.Codec; import org.redisson.command.CommandAsyncExecutor; import org.redisson.executor.RemotePromise; -import org.redisson.misc.RPromise; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; @@ -68,14 +66,14 @@ public class SyncRemoteProxy extends BaseRemoteProxy { RemoteServiceRequest request = new RemoteServiceRequest(executorId, requestId.toString(), method.getName(), remoteService.getMethodSignature(method), args, optionsCopy, System.currentTimeMillis()); - final RFuture ackFuture; + CompletableFuture ackFuture; if (optionsCopy.isAckExpected()) { ackFuture = pollResponse(optionsCopy.getAckTimeoutInMillis(), requestId, false); } else { ackFuture = null; } - - final RPromise responseFuture; + + CompletableFuture responseFuture; if (optionsCopy.isResultExpected()) { long timeout = remoteService.getTimeout(optionsCopy.getExecutionTimeoutInMillis(), request); responseFuture = pollResponse(timeout, requestId, false); @@ -118,7 +116,7 @@ public class SyncRemoteProxy extends BaseRemoteProxy { // skip } if (ack == null) { - RFuture ackFutureAttempt = + CompletionStage ackFutureAttempt = tryPollAckAgainAsync(optionsCopy, ackName, requestId); try { ack = ackFutureAttempt.toCompletableFuture().get(optionsCopy.getAckTimeoutInMillis(), TimeUnit.MILLISECONDS);