From d446ad427ec0e9484e9bab9aacf0e0b7eee67ea3 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Mon, 17 Jan 2022 11:48:13 +0300 Subject: [PATCH] refactoring --- .../org/redisson/RedissonBinaryStream.java | 4 +- .../org/redisson/RedissonBloomFilter.java | 26 +++++---- .../RedissonPriorityBlockingQueue.java | 2 +- .../org/redisson/RedissonRateLimiter.java | 57 ++++++++----------- .../org/redisson/RedissonReliableTopic.java | 6 +- 5 files changed, 44 insertions(+), 51 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonBinaryStream.java b/redisson/src/main/java/org/redisson/RedissonBinaryStream.java index e8eee69ac..8cd71670d 100644 --- a/redisson/src/main/java/org/redisson/RedissonBinaryStream.java +++ b/redisson/src/main/java/org/redisson/RedissonBinaryStream.java @@ -215,7 +215,7 @@ public class RedissonBinaryStream extends RedissonBucket implements RBin @Override public void read(ByteBuffer dst, A attachment, CompletionHandler handler) { RFuture res = (RFuture) read(dst); - res.onComplete((r, e) -> { + res.whenComplete((r, e) -> { if (e != null) { handler.failed(e, attachment); } else { @@ -244,7 +244,7 @@ public class RedissonBinaryStream extends RedissonBucket implements RBin @Override public void write(ByteBuffer src, A attachment, CompletionHandler handler) { RFuture res = (RFuture) write(src); - res.onComplete((r, e) -> { + res.whenComplete((r, e) -> { if (e != null) { handler.failed(e, attachment); } else { diff --git a/redisson/src/main/java/org/redisson/RedissonBloomFilter.java b/redisson/src/main/java/org/redisson/RedissonBloomFilter.java index 3726c5c49..e13a4315d 100644 --- a/redisson/src/main/java/org/redisson/RedissonBloomFilter.java +++ b/redisson/src/main/java/org/redisson/RedissonBloomFilter.java @@ -40,12 +40,14 @@ import org.redisson.client.protocol.convertor.VoidReplayConvertor; import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder; import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandBatchService; +import org.redisson.misc.CompletableFutureWrapper; import org.redisson.misc.Hash; import java.math.BigDecimal; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; /** @@ -330,39 +332,39 @@ public class RedissonBloomFilter extends RedissonExpirable implements RBloomF @Override public RFuture renameAsync(String newName) { String newConfigName = suffixName(newName, "config"); - RFuture f = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID, + RFuture future = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID, "if redis.call('exists', KEYS[1]) == 1 then " + "redis.call('rename', KEYS[1], ARGV[1]); " + "end; " + "return redis.call('rename', KEYS[2], ARGV[2]); ", Arrays.asList(getRawName(), configName), newName, newConfigName); - f.onComplete((value, e) -> { - if (e == null) { - setName(newName); - this.configName = newConfigName; - } + CompletionStage f = future.thenApply(value -> { + setName(newName); + this.configName = newConfigName; + return value; }); - return f; + return new CompletableFutureWrapper<>(f); } @Override public RFuture renamenxAsync(String newName) { String newConfigName = suffixName(newName, "config"); - RFuture f = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + RFuture future = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local r = redis.call('renamenx', KEYS[1], ARGV[1]); " + "if r == 0 then " + " return 0; " + "else " + " return redis.call('renamenx', KEYS[2], ARGV[2]); " + "end; ", - Arrays.asList(getRawName(), configName), newName, newConfigName); - f.onComplete((value, e) -> { - if (e == null && value) { + Arrays.asList(getRawName(), configName), newName, newConfigName); + CompletionStage f = future.thenApply(value -> { + if (value) { setName(newName); this.configName = newConfigName; } + return value; }); - return f; + return new CompletableFutureWrapper<>(f); } private V check(V result) { diff --git a/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java b/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java index 98e1ff034..f4f3eb47a 100644 --- a/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java @@ -73,7 +73,7 @@ public class RedissonPriorityBlockingQueue extends RedissonPriorityQueue i long start = System.currentTimeMillis(); commandExecutor.getConnectionManager().getGroup().schedule(() -> { RFuture future = wrapLockedAsync(command, params); - future.onComplete((res, e) -> { + future.whenComplete((res, e) -> { if (e != null && !(e instanceof RedisConnectionException)) { result.tryFailure(e); return; diff --git a/redisson/src/main/java/org/redisson/RedissonRateLimiter.java b/redisson/src/main/java/org/redisson/RedissonRateLimiter.java index d69b40526..f0c032804 100644 --- a/redisson/src/main/java/org/redisson/RedissonRateLimiter.java +++ b/redisson/src/main/java/org/redisson/RedissonRateLimiter.java @@ -24,10 +24,11 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.MapEntriesDecoder; import org.redisson.client.protocol.decoder.MultiDecoder; import org.redisson.command.CommandAsyncExecutor; -import org.redisson.misc.RPromise; -import org.redisson.misc.RedissonPromise; +import org.redisson.misc.CompletableFutureWrapper; import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @@ -95,16 +96,8 @@ public class RedissonRateLimiter extends RedissonExpirable implements RRateLimit @Override public RFuture acquireAsync(long permits) { - RPromise promise = new RedissonPromise(); - tryAcquireAsync(permits, -1, null).onComplete((res, e) -> { - if (e != null) { - promise.tryFailure(e); - return; - } - - promise.trySuccess(null); - }); - return promise; + CompletionStage f = tryAcquireAsync(permits, -1, null).thenApply(res -> null); + return new CompletableFutureWrapper<>(f); } @Override @@ -124,59 +117,57 @@ public class RedissonRateLimiter extends RedissonExpirable implements RRateLimit @Override public RFuture tryAcquireAsync(long permits, long timeout, TimeUnit unit) { - RPromise promise = new RedissonPromise(); long timeoutInMillis = -1; if (timeout >= 0) { timeoutInMillis = unit.toMillis(timeout); } - tryAcquireAsync(permits, promise, timeoutInMillis); - return promise; + CompletableFuture f = tryAcquireAsync(permits, timeoutInMillis); + return new CompletableFutureWrapper<>(f); } - private void tryAcquireAsync(long permits, RPromise promise, long timeoutInMillis) { + private CompletableFuture tryAcquireAsync(long permits, long timeoutInMillis) { long s = System.currentTimeMillis(); RFuture future = tryAcquireAsync(RedisCommands.EVAL_LONG, permits); - future.onComplete((delay, e) -> { - if (e != null) { - promise.tryFailure(e); - return; - } - + return future.thenCompose(delay -> { if (delay == null) { - promise.trySuccess(true); - return; + return CompletableFuture.completedFuture(true); } if (timeoutInMillis == -1) { + CompletableFuture f = new CompletableFuture<>(); commandExecutor.getConnectionManager().getGroup().schedule(() -> { - tryAcquireAsync(permits, promise, timeoutInMillis); + CompletableFuture r = tryAcquireAsync(permits, timeoutInMillis); + commandExecutor.transfer(r, f); }, delay, TimeUnit.MILLISECONDS); - return; + return f; } long el = System.currentTimeMillis() - s; long remains = timeoutInMillis - el; if (remains <= 0) { - promise.trySuccess(false); - return; + return CompletableFuture.completedFuture(false); } + + CompletableFuture f = new CompletableFuture<>(); if (remains < delay) { commandExecutor.getConnectionManager().getGroup().schedule(() -> { - promise.trySuccess(false); + f.complete(false); }, remains, TimeUnit.MILLISECONDS); } else { long start = System.currentTimeMillis(); commandExecutor.getConnectionManager().getGroup().schedule(() -> { long elapsed = System.currentTimeMillis() - start; if (remains <= elapsed) { - promise.trySuccess(false); + f.complete(false); return; } - - tryAcquireAsync(permits, promise, remains - elapsed); + + CompletableFuture r = tryAcquireAsync(permits, remains - elapsed); + commandExecutor.transfer(r, f); }, delay, TimeUnit.MILLISECONDS); } - }); + return f; + }).toCompletableFuture(); } private RFuture tryAcquireAsync(RedisCommand command, Long value) { diff --git a/redisson/src/main/java/org/redisson/RedissonReliableTopic.java b/redisson/src/main/java/org/redisson/RedissonReliableTopic.java index 0445e475e..093fecf36 100644 --- a/redisson/src/main/java/org/redisson/RedissonReliableTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonReliableTopic.java @@ -184,7 +184,7 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl private void poll(String id, StreamMessageId startId) { readFuture = commandExecutor.readAsync(getRawName(), new CompositeCodec(StringCodec.INSTANCE, codec), RedisCommands.XREAD_BLOCKING_SINGLE, "BLOCK", 0, "STREAMS", getRawName(), startId); - readFuture.onComplete((res, ex) -> { + readFuture.whenComplete((res, ex) -> { if (readFuture.isCancelled()) { return; } @@ -246,7 +246,7 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl + "return r ~= false; ", Arrays.asList(getRawName(), getSubscribersName(), getMapName(), getCounter(), getTimeout()), lastId, id, time); - updateFuture.onComplete((re, exc) -> { + updateFuture.whenComplete((re, exc) -> { if (exc != null) { if (exc instanceof RedissonShutdownException) { return; @@ -333,7 +333,7 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl + "return 1; ", Arrays.asList(getTimeout()), System.currentTimeMillis() + commandExecutor.getConnectionManager().getCfg().getReliableTopicWatchdogTimeout(), subscriberId.get()); - future.onComplete((res, e) -> { + future.whenComplete((res, e) -> { if (e != null) { log.error("Can't update reliable topic " + getRawName() + " expiration time", e); return;