From 5a7a8bd253372274d7e1a322250a9437362200eb Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Tue, 26 Apr 2022 15:45:58 +0300 Subject: [PATCH] refactoring --- .../java/org/redisson/RedissonMultiLock.java | 124 ++++++------------ .../java/org/redisson/RedissonSortedSet.java | 29 ++-- .../org/redisson/RedissonTransferQueue.java | 25 ++-- 3 files changed, 64 insertions(+), 114 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonMultiLock.java b/redisson/src/main/java/org/redisson/RedissonMultiLock.java index ea257a3ab..1f0111756 100644 --- a/redisson/src/main/java/org/redisson/RedissonMultiLock.java +++ b/redisson/src/main/java/org/redisson/RedissonMultiLock.java @@ -20,13 +20,9 @@ import org.redisson.api.RLock; import org.redisson.api.RLockAsync; import org.redisson.client.RedisResponseTimeoutException; import org.redisson.misc.CompletableFutureWrapper; -import org.redisson.misc.RPromise; -import org.redisson.misc.RedissonPromise; import java.util.*; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.*; import java.util.concurrent.locks.Condition; /** @@ -77,10 +73,9 @@ public class RedissonMultiLock implements RLock { acquiredLocks = new ArrayList<>(locks.size()); } - void tryAcquireLockAsync(ListIterator iterator, RPromise result) { + CompletionStage tryAcquireLockAsync(ListIterator iterator) { if (!iterator.hasNext()) { - checkLeaseTimeAsync(result); - return; + return checkLeaseTimeAsync(); } RLock lock = iterator.next(); @@ -91,35 +86,28 @@ public class RedissonMultiLock implements RLock { long awaitTime = Math.min(lockWaitTime, remainTime); lockAcquiredFuture = lock.tryLockAsync(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS, threadId); } - - lockAcquiredFuture.whenComplete((res, e) -> { + + return lockAcquiredFuture + .exceptionally(e -> null) + .thenCompose(res -> { boolean lockAcquired = false; if (res != null) { lockAcquired = res; - } - - if (e instanceof RedisResponseTimeoutException) { + } else { unlockInnerAsync(Arrays.asList(lock), threadId); } - + if (lockAcquired) { acquiredLocks.add(lock); } else { if (locks.size() - acquiredLocks.size() == failedLocksLimit()) { - checkLeaseTimeAsync(result); - return; + return checkLeaseTimeAsync(); } if (failedLocksLimit == 0) { - unlockInnerAsync(acquiredLocks, threadId).onComplete((r, ex) -> { - if (ex != null) { - result.tryFailure(ex); - return; - } - + return unlockInnerAsync(acquiredLocks, threadId).thenCompose(r -> { if (waitTime <= 0) { - result.trySuccess(false); - return; + return CompletableFuture.completedFuture(false); } failedLocksLimit = failedLocksLimit(); @@ -129,58 +117,41 @@ public class RedissonMultiLock implements RLock { iterator.previous(); } - checkRemainTimeAsync(iterator, result); + return checkRemainTimeAsync(iterator); }); - return; } else { failedLocksLimit--; } } - checkRemainTimeAsync(iterator, result); + return checkRemainTimeAsync(iterator); }); } - private void checkLeaseTimeAsync(RPromise result) { + private CompletableFuture checkLeaseTimeAsync() { if (leaseTime > 0) { - AtomicInteger counter = new AtomicInteger(acquiredLocks.size()); + List> futures = new ArrayList<>(); for (RLock rLock : acquiredLocks) { RFuture future = ((RedissonBaseLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS); - future.onComplete((res, e) -> { - if (e != null) { - result.tryFailure(e); - return; - } - - if (counter.decrementAndGet() == 0) { - result.trySuccess(true); - } - }); + futures.add(future.toCompletableFuture()); } - return; + CompletableFuture f = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + return f.thenApply(r -> true); } - - result.trySuccess(true); + + return CompletableFuture.completedFuture(true); } - private void checkRemainTimeAsync(ListIterator iterator, RPromise result) { + private CompletionStage checkRemainTimeAsync(ListIterator iterator) { if (remainTime > 0) { remainTime += -(System.currentTimeMillis() - time); time = System.currentTimeMillis(); if (remainTime <= 0) { - unlockInnerAsync(acquiredLocks, threadId).onComplete((res, e) -> { - if (e != null) { - result.tryFailure(e); - return; - } - - result.trySuccess(false); - }); - return; + return unlockInnerAsync(acquiredLocks, threadId).thenApply(res -> false); } } - tryAcquireLockAsync(iterator, result); + return tryAcquireLockAsync(iterator); } } @@ -240,23 +211,17 @@ public class RedissonMultiLock implements RLock { waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime); } } - - RPromise result = new RedissonPromise(); - tryLockAsync(threadId, leaseTime, TimeUnit.MILLISECONDS, waitTime, result); - return result; + + CompletionStage f = tryLockAsyncCycle(threadId, leaseTime, TimeUnit.MILLISECONDS, waitTime); + return new CompletableFutureWrapper<>(f); } - protected void tryLockAsync(long threadId, long leaseTime, TimeUnit unit, long waitTime, RPromise result) { - tryLockAsync(waitTime, leaseTime, unit, threadId).onComplete((res, e) -> { - if (e != null) { - result.tryFailure(e); - return; - } - + protected CompletionStage tryLockAsyncCycle(long threadId, long leaseTime, TimeUnit unit, long waitTime) { + return tryLockAsync(waitTime, leaseTime, unit, threadId).thenCompose(res -> { if (res) { - result.trySuccess(null); + return CompletableFuture.completedFuture(null); } else { - tryLockAsync(threadId, leaseTime, unit, waitTime, result); + return tryLockAsyncCycle(threadId, leaseTime, unit, waitTime); } }); } @@ -312,25 +277,13 @@ public class RedissonMultiLock implements RLock { } protected RFuture unlockInnerAsync(Collection locks, long threadId) { - if (locks.isEmpty()) { - return new CompletableFutureWrapper<>((Void) null); - } - - RPromise result = new RedissonPromise(); - AtomicInteger counter = new AtomicInteger(locks.size()); + List> futures = new ArrayList<>(locks.size()); for (RLock lock : locks) { - lock.unlockAsync(threadId).onComplete((res, e) -> { - if (e != null) { - result.tryFailure(e); - return; - } - - if (counter.decrementAndGet() == 0) { - result.trySuccess(null); - } - }); + RFuture f = lock.unlockAsync(threadId); + futures.add(f.toCompletableFuture()); } - return result; + CompletableFuture future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + return new CompletableFutureWrapper<>(future); } @Override @@ -429,10 +382,9 @@ public class RedissonMultiLock implements RLock { @Override public RFuture tryLockAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { - RPromise result = new RedissonPromise(); LockState state = new LockState(waitTime, leaseTime, unit, threadId); - state.tryAcquireLockAsync(locks.listIterator(), result); - return result; + CompletionStage f = state.tryAcquireLockAsync(locks.listIterator()); + return new CompletableFutureWrapper<>(f); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonSortedSet.java b/redisson/src/main/java/org/redisson/RedissonSortedSet.java index 413c78abd..70eba86ef 100644 --- a/redisson/src/main/java/org/redisson/RedissonSortedSet.java +++ b/redisson/src/main/java/org/redisson/RedissonSortedSet.java @@ -25,14 +25,14 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; import org.redisson.iterator.RedissonBaseIterator; import org.redisson.mapreduce.RedissonCollectionMapReduce; -import org.redisson.misc.RPromise; -import org.redisson.misc.RedissonPromise; +import org.redisson.misc.CompletableFutureWrapper; import java.io.ByteArrayOutputStream; import java.io.ObjectOutputStream; import java.math.BigInteger; import java.security.MessageDigest; import java.util.*; +import java.util.concurrent.CompletableFuture; import static org.redisson.client.protocol.RedisCommands.EVAL_LIST_SCAN; @@ -228,37 +228,38 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet addAsync(final V value) { - final RPromise promise = new RedissonPromise(); + CompletableFuture promise = new CompletableFuture<>(); commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() { public void run() { try { boolean res = add(value); - promise.trySuccess(res); + promise.complete(res); } catch (Exception e) { - promise.tryFailure(e); + promise.completeExceptionally(e); } } }); - return promise; + return new CompletableFutureWrapper<>(promise); } @Override public RFuture removeAsync(final Object value) { - final RPromise promise = new RedissonPromise(); + CompletableFuture promise = new CompletableFuture<>(); commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() { @Override public void run() { try { boolean result = remove(value); - promise.trySuccess(result); + promise.complete(result); } catch (Exception e) { - promise.tryFailure(e); + promise.completeExceptionally(e); } } }); - return promise; + return new CompletableFutureWrapper<>(promise); } @Override @@ -305,7 +306,7 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet c) { boolean changed = false; for (Iterator iterator = iterator(); iterator.hasNext();) { - Object object = (Object) iterator.next(); + Object object = iterator.next(); if (!c.contains(object)) { iterator.remove(); changed = true; @@ -385,7 +386,7 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSetasList(getRawName(), getComparatorKeyName()), comparatorSign)); + Arrays.asList(getRawName(), getComparatorKeyName()), comparatorSign)); if (res) { this.comparator = comparator; } @@ -409,7 +410,7 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet extends RedissonObject implements RSortedSetasList(getRawName(), iteratorName), count); + Arrays.asList(getRawName(), iteratorName), count); } // TODO optimize: get three values each time instead of single diff --git a/redisson/src/main/java/org/redisson/RedissonTransferQueue.java b/redisson/src/main/java/org/redisson/RedissonTransferQueue.java index 9abd611aa..9bf6fdd41 100644 --- a/redisson/src/main/java/org/redisson/RedissonTransferQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonTransferQueue.java @@ -33,8 +33,6 @@ import org.redisson.connection.decoder.ListDrainToDecoder; import org.redisson.executor.RemotePromise; import org.redisson.iterator.RedissonListIterator; import org.redisson.misc.CompletableFutureWrapper; -import org.redisson.misc.RPromise; -import org.redisson.misc.RedissonPromise; import org.redisson.remote.RemoteServiceRequest; import java.time.Duration; @@ -187,10 +185,9 @@ public class RedissonTransferQueue extends RedissonExpirable implements RTran return true; } + @Override public RFuture tryTransferAsync(V v, long timeout, TimeUnit unit) { - RPromise result = new RedissonPromise<>(); - result.setUncancellable(); - + CompletableFuture result = new CompletableFuture<>(); RemotePromise future = (RemotePromise) service.invoke(v).toCompletableFuture(); long remainTime = unit.toMillis(timeout); @@ -205,34 +202,34 @@ public class RedissonTransferQueue extends RedissonExpirable implements RTran future.whenComplete((res, exc) -> { if (future.isCancelled()) { - result.trySuccess(false); + result.complete(false); return; } timeoutFuture.cancel(); if (exc != null) { - result.tryFailure(exc); + result.completeExceptionally(exc); return; } - result.trySuccess(true); + result.complete(true); }); future.getAddFuture().whenComplete((added, e) -> { if (future.getAddFuture().isCancelled()) { - result.trySuccess(false); + result.complete(false); return; } if (e != null) { timeoutFuture.cancel(); - result.tryFailure(e); + result.completeExceptionally(e); return; } if (!added) { timeoutFuture.cancel(); - result.trySuccess(false); + result.complete(false); return; } @@ -240,13 +237,13 @@ public class RedissonTransferQueue extends RedissonExpirable implements RTran future.cancelAsync(false).whenComplete((canceled, ex) -> { if (ex != null) { timeoutFuture.cancel(); - result.tryFailure(ex); + result.completeExceptionally(ex); return; } if (canceled) { timeoutFuture.cancel(); - result.trySuccess(false); + result.complete(false); } }); }; @@ -260,7 +257,7 @@ public class RedissonTransferQueue extends RedissonExpirable implements RTran task.run(); } }); - return result; + return new CompletableFutureWrapper<>(result); } @Override