refactoring

pull/4104/head
Nikita Koksharov 3 years ago
parent 5079cb8462
commit ce6eed8fdf

@ -24,8 +24,6 @@ import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.pubsub.SemaphorePubSub;
import java.util.Arrays;
@ -141,7 +139,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
return new CompletableFutureWrapper<>(f);
}
private void tryAcquireAsync(AtomicLong time, int permits, RedissonLockEntry entry, RPromise<String> result, long ttl, TimeUnit timeUnit) {
private void tryAcquireAsync(AtomicLong time, int permits, RedissonLockEntry entry, CompletableFuture<String> result, long ttl, TimeUnit timeUnit) {
if (result.isDone()) {
unsubscribe(entry);
return;
@ -149,17 +147,17 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
if (time.get() <= 0) {
unsubscribe(entry);
result.trySuccess(null);
result.complete(null);
return;
}
long timeoutDate = calcTimeout(ttl, timeUnit);
long curr = System.currentTimeMillis();
RFuture<String> tryAcquireFuture = tryAcquireAsync(permits, timeoutDate);
tryAcquireFuture.onComplete((permitId, e) -> {
tryAcquireFuture.whenComplete((permitId, e) -> {
if (e != null) {
unsubscribe(entry);
result.tryFailure(e);
result.completeExceptionally(e);
return;
}
@ -167,7 +165,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
if (permitId != null) {
if (!permitId.startsWith(":")) {
unsubscribe(entry);
if (!result.trySuccess(permitId)) {
if (!result.complete(permitId)) {
releaseAsync(permitId);
}
return;
@ -183,7 +181,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
if (time.get() <= 0) {
unsubscribe(entry);
result.trySuccess(null);
result.complete(null);
return;
}
@ -378,9 +376,11 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
"return ':' .. tostring(v[2]); " +
"end " +
"return nil;",
Arrays.<Object>asList(getRawName(), timeoutName, getChannelName()), permits, timeoutDate, id, System.currentTimeMillis(), nonExpirableTimeout);
Arrays.<Object>asList(getRawName(), timeoutName, getChannelName()),
permits, timeoutDate, id, System.currentTimeMillis(), nonExpirableTimeout);
}
@Override
public RFuture<String> tryAcquireAsync(long waitTime, TimeUnit unit) {
return tryAcquireAsync(1, waitTime, -1, unit);
}
@ -465,19 +465,19 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
}
private RFuture<String> tryAcquireAsync(int permits, long waitTime, long ttl, TimeUnit timeUnit) {
RPromise<String> result = new RedissonPromise<String>();
CompletableFuture<String> result = new CompletableFuture<>();
AtomicLong time = new AtomicLong(timeUnit.toMillis(waitTime));
long curr = System.currentTimeMillis();
long timeoutDate = calcTimeout(ttl, timeUnit);
RFuture<String> tryAcquireFuture = tryAcquireAsync(permits, timeoutDate);
tryAcquireFuture.onComplete((permitId, e) -> {
tryAcquireFuture.whenComplete((permitId, e) -> {
if (e != null) {
result.tryFailure(e);
result.completeExceptionally(e);
return;
}
if (permitId != null && !permitId.startsWith(":")) {
if (!result.trySuccess(permitId)) {
if (!result.complete(permitId)) {
releaseAsync(permitId);
}
return;
@ -487,7 +487,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
time.addAndGet(-el);
if (time.get() <= 0) {
result.trySuccess(null);
result.complete(null);
return;
}
@ -496,7 +496,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe();
subscribeFuture.whenComplete((r, ex) -> {
if (ex != null) {
result.tryFailure(ex);
result.completeExceptionally(ex);
return;
}
@ -515,7 +515,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
@Override
public void run(Timeout timeout) throws Exception {
if (!subscribeFuture.isDone()) {
result.trySuccess(null);
result.complete(null);
}
}
}, time.get(), TimeUnit.MILLISECONDS);
@ -523,7 +523,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
}
});
return result;
return new CompletableFutureWrapper<>(result);
}
private CompletableFuture<RedissonLockEntry> subscribe() {

Loading…
Cancel
Save