refactoring

pull/4272/head
Nikita Koksharov 3 years ago
parent dd3ac01e35
commit 5a7a8bd253

@ -20,13 +20,9 @@ import org.redisson.api.RLock;
import org.redisson.api.RLockAsync;
import org.redisson.client.RedisResponseTimeoutException;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
/**
@ -77,10 +73,9 @@ public class RedissonMultiLock implements RLock {
acquiredLocks = new ArrayList<>(locks.size());
}
void tryAcquireLockAsync(ListIterator<RLock> iterator, RPromise<Boolean> result) {
CompletionStage<Boolean> tryAcquireLockAsync(ListIterator<RLock> iterator) {
if (!iterator.hasNext()) {
checkLeaseTimeAsync(result);
return;
return checkLeaseTimeAsync();
}
RLock lock = iterator.next();
@ -91,35 +86,28 @@ public class RedissonMultiLock implements RLock {
long awaitTime = Math.min(lockWaitTime, remainTime);
lockAcquiredFuture = lock.tryLockAsync(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS, threadId);
}
lockAcquiredFuture.whenComplete((res, e) -> {
return lockAcquiredFuture
.exceptionally(e -> null)
.thenCompose(res -> {
boolean lockAcquired = false;
if (res != null) {
lockAcquired = res;
}
if (e instanceof RedisResponseTimeoutException) {
} else {
unlockInnerAsync(Arrays.asList(lock), threadId);
}
if (lockAcquired) {
acquiredLocks.add(lock);
} else {
if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
checkLeaseTimeAsync(result);
return;
return checkLeaseTimeAsync();
}
if (failedLocksLimit == 0) {
unlockInnerAsync(acquiredLocks, threadId).onComplete((r, ex) -> {
if (ex != null) {
result.tryFailure(ex);
return;
}
return unlockInnerAsync(acquiredLocks, threadId).thenCompose(r -> {
if (waitTime <= 0) {
result.trySuccess(false);
return;
return CompletableFuture.completedFuture(false);
}
failedLocksLimit = failedLocksLimit();
@ -129,58 +117,41 @@ public class RedissonMultiLock implements RLock {
iterator.previous();
}
checkRemainTimeAsync(iterator, result);
return checkRemainTimeAsync(iterator);
});
return;
} else {
failedLocksLimit--;
}
}
checkRemainTimeAsync(iterator, result);
return checkRemainTimeAsync(iterator);
});
}
private void checkLeaseTimeAsync(RPromise<Boolean> result) {
private CompletableFuture<Boolean> checkLeaseTimeAsync() {
if (leaseTime > 0) {
AtomicInteger counter = new AtomicInteger(acquiredLocks.size());
List<CompletableFuture<Boolean>> futures = new ArrayList<>();
for (RLock rLock : acquiredLocks) {
RFuture<Boolean> future = ((RedissonBaseLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
future.onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
if (counter.decrementAndGet() == 0) {
result.trySuccess(true);
}
});
futures.add(future.toCompletableFuture());
}
return;
CompletableFuture<Void> f = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
return f.thenApply(r -> true);
}
result.trySuccess(true);
return CompletableFuture.completedFuture(true);
}
private void checkRemainTimeAsync(ListIterator<RLock> iterator, RPromise<Boolean> result) {
private CompletionStage<Boolean> checkRemainTimeAsync(ListIterator<RLock> iterator) {
if (remainTime > 0) {
remainTime += -(System.currentTimeMillis() - time);
time = System.currentTimeMillis();
if (remainTime <= 0) {
unlockInnerAsync(acquiredLocks, threadId).onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
result.trySuccess(false);
});
return;
return unlockInnerAsync(acquiredLocks, threadId).thenApply(res -> false);
}
}
tryAcquireLockAsync(iterator, result);
return tryAcquireLockAsync(iterator);
}
}
@ -240,23 +211,17 @@ public class RedissonMultiLock implements RLock {
waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);
}
}
RPromise<Void> result = new RedissonPromise<Void>();
tryLockAsync(threadId, leaseTime, TimeUnit.MILLISECONDS, waitTime, result);
return result;
CompletionStage<Void> f = tryLockAsyncCycle(threadId, leaseTime, TimeUnit.MILLISECONDS, waitTime);
return new CompletableFutureWrapper<>(f);
}
protected void tryLockAsync(long threadId, long leaseTime, TimeUnit unit, long waitTime, RPromise<Void> result) {
tryLockAsync(waitTime, leaseTime, unit, threadId).onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
protected CompletionStage<Void> tryLockAsyncCycle(long threadId, long leaseTime, TimeUnit unit, long waitTime) {
return tryLockAsync(waitTime, leaseTime, unit, threadId).thenCompose(res -> {
if (res) {
result.trySuccess(null);
return CompletableFuture.completedFuture(null);
} else {
tryLockAsync(threadId, leaseTime, unit, waitTime, result);
return tryLockAsyncCycle(threadId, leaseTime, unit, waitTime);
}
});
}
@ -312,25 +277,13 @@ public class RedissonMultiLock implements RLock {
}
protected RFuture<Void> unlockInnerAsync(Collection<RLock> locks, long threadId) {
if (locks.isEmpty()) {
return new CompletableFutureWrapper<>((Void) null);
}
RPromise<Void> result = new RedissonPromise<Void>();
AtomicInteger counter = new AtomicInteger(locks.size());
List<CompletableFuture<Void>> futures = new ArrayList<>(locks.size());
for (RLock lock : locks) {
lock.unlockAsync(threadId).onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
if (counter.decrementAndGet() == 0) {
result.trySuccess(null);
}
});
RFuture<Void> f = lock.unlockAsync(threadId);
futures.add(f.toCompletableFuture());
}
return result;
CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
return new CompletableFutureWrapper<>(future);
}
@Override
@ -429,10 +382,9 @@ public class RedissonMultiLock implements RLock {
@Override
public RFuture<Boolean> tryLockAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RPromise<Boolean> result = new RedissonPromise<Boolean>();
LockState state = new LockState(waitTime, leaseTime, unit, threadId);
state.tryAcquireLockAsync(locks.listIterator(), result);
return result;
CompletionStage<Boolean> f = state.tryAcquireLockAsync(locks.listIterator());
return new CompletableFutureWrapper<>(f);
}
@Override

@ -25,14 +25,14 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.iterator.RedissonBaseIterator;
import org.redisson.mapreduce.RedissonCollectionMapReduce;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.CompletableFutureWrapper;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.math.BigInteger;
import java.security.MessageDigest;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import static org.redisson.client.protocol.RedisCommands.EVAL_LIST_SCAN;
@ -228,37 +228,38 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
}
}
@Override
public RFuture<Boolean> addAsync(final V value) {
final RPromise<Boolean> promise = new RedissonPromise<Boolean>();
CompletableFuture<Boolean> promise = new CompletableFuture<>();
commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() {
public void run() {
try {
boolean res = add(value);
promise.trySuccess(res);
promise.complete(res);
} catch (Exception e) {
promise.tryFailure(e);
promise.completeExceptionally(e);
}
}
});
return promise;
return new CompletableFutureWrapper<>(promise);
}
@Override
public RFuture<Boolean> removeAsync(final Object value) {
final RPromise<Boolean> promise = new RedissonPromise<Boolean>();
CompletableFuture<Boolean> promise = new CompletableFuture<>();
commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() {
@Override
public void run() {
try {
boolean result = remove(value);
promise.trySuccess(result);
promise.complete(result);
} catch (Exception e) {
promise.tryFailure(e);
promise.completeExceptionally(e);
}
}
});
return promise;
return new CompletableFutureWrapper<>(promise);
}
@Override
@ -305,7 +306,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
public boolean retainAll(Collection<?> c) {
boolean changed = false;
for (Iterator<?> iterator = iterator(); iterator.hasNext();) {
Object object = (Object) iterator.next();
Object object = iterator.next();
if (!c.contains(object)) {
iterator.remove();
changed = true;
@ -385,7 +386,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
+ "else "
+ "return 0; "
+ "end",
Arrays.<Object>asList(getRawName(), getComparatorKeyName()), comparatorSign));
Arrays.asList(getRawName(), getComparatorKeyName()), comparatorSign));
if (res) {
this.comparator = comparator;
}
@ -409,7 +410,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
@Override
protected void remove(Object value) {
RedissonSortedSet.this.remove((V) value);
RedissonSortedSet.this.remove(value);
}
};
}
@ -437,7 +438,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
+ "end; "
+ "redis.call('setex', KEYS[2], 3600, end_index);"
+ "return {end_index, result};",
Arrays.<Object>asList(getRawName(), iteratorName), count);
Arrays.asList(getRawName(), iteratorName), count);
}
// TODO optimize: get three values each time instead of single

@ -33,8 +33,6 @@ import org.redisson.connection.decoder.ListDrainToDecoder;
import org.redisson.executor.RemotePromise;
import org.redisson.iterator.RedissonListIterator;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.remote.RemoteServiceRequest;
import java.time.Duration;
@ -187,10 +185,9 @@ public class RedissonTransferQueue<V> extends RedissonExpirable implements RTran
return true;
}
@Override
public RFuture<Boolean> tryTransferAsync(V v, long timeout, TimeUnit unit) {
RPromise<Boolean> result = new RedissonPromise<>();
result.setUncancellable();
CompletableFuture<Boolean> result = new CompletableFuture<>();
RemotePromise<Void> future = (RemotePromise<Void>) service.invoke(v).toCompletableFuture();
long remainTime = unit.toMillis(timeout);
@ -205,34 +202,34 @@ public class RedissonTransferQueue<V> extends RedissonExpirable implements RTran
future.whenComplete((res, exc) -> {
if (future.isCancelled()) {
result.trySuccess(false);
result.complete(false);
return;
}
timeoutFuture.cancel();
if (exc != null) {
result.tryFailure(exc);
result.completeExceptionally(exc);
return;
}
result.trySuccess(true);
result.complete(true);
});
future.getAddFuture().whenComplete((added, e) -> {
if (future.getAddFuture().isCancelled()) {
result.trySuccess(false);
result.complete(false);
return;
}
if (e != null) {
timeoutFuture.cancel();
result.tryFailure(e);
result.completeExceptionally(e);
return;
}
if (!added) {
timeoutFuture.cancel();
result.trySuccess(false);
result.complete(false);
return;
}
@ -240,13 +237,13 @@ public class RedissonTransferQueue<V> extends RedissonExpirable implements RTran
future.cancelAsync(false).whenComplete((canceled, ex) -> {
if (ex != null) {
timeoutFuture.cancel();
result.tryFailure(ex);
result.completeExceptionally(ex);
return;
}
if (canceled) {
timeoutFuture.cancel();
result.trySuccess(false);
result.complete(false);
}
});
};
@ -260,7 +257,7 @@ public class RedissonTransferQueue<V> extends RedissonExpirable implements RTran
task.run();
}
});
return result;
return new CompletableFutureWrapper<>(result);
}
@Override

Loading…
Cancel
Save