refactoring

pull/4104/head
Nikita Koksharov 3 years ago
parent ce6eed8fdf
commit d38af27e96

@ -254,7 +254,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) {
if (hasNoLoader()) {
return new CompletableFutureWrapper<>(null);
return new CompletableFutureWrapper((Void)null);
}
CompletableFuture<V> future = loadValue((K) key, false);
@ -651,7 +651,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
broadcastLocalCacheStore(entry.getValue(), keyEncoded, cacheKey);
}
return new CompletableFutureWrapper<>(null);
return new CompletableFutureWrapper((Void)null);
}
List<Object> params = new ArrayList<Object>(map.size()*3);
@ -1047,7 +1047,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
return new CompletableFutureWrapper<>((V) prevValue.getValue());
} else {
mapKey.release();
return new CompletableFutureWrapper<>(null);
return new CompletableFutureWrapper((Void)null);
}
}
@ -1176,7 +1176,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
return new CompletableFutureWrapper<>((V) prevValue.getValue());
} else {
mapKey.release();
return new CompletableFutureWrapper<>(null);
return new CompletableFutureWrapper((Void)null);
}
}
@ -1199,7 +1199,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
CacheValue prevValue = cachePutIfAbsent(cacheKey, key, value);
if (prevValue == null) {
broadcastLocalCacheStore((V) value, mapKey, cacheKey);
return new CompletableFutureWrapper<>(null);
return new CompletableFutureWrapper((Void)null);
} else {
mapKey.release();
return new CompletableFutureWrapper<>((V) prevValue.getValue());

@ -23,8 +23,7 @@ import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.pubsub.SemaphorePubSub;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -103,16 +102,16 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
@Override
public RFuture<Void> acquireAsync(int permits) {
RPromise<Void> result = new RedissonPromise<Void>();
CompletableFuture<Void> result = new CompletableFuture<>();
RFuture<Boolean> tryAcquireFuture = tryAcquireAsync(permits);
tryAcquireFuture.onComplete((res, e) -> {
tryAcquireFuture.whenComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
result.completeExceptionally(e);
return;
}
if (res) {
if (!result.trySuccess(null)) {
if (!result.complete(null)) {
releaseAsync(permits);
}
return;
@ -121,17 +120,17 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe();
subscribeFuture.whenComplete((r, e1) -> {
if (e1 != null) {
result.tryFailure(e1);
result.completeExceptionally(e1);
return;
}
acquireAsync(permits, r, result);
});
});
return result;
return new CompletableFutureWrapper<>(result);
}
private void tryAcquireAsync(AtomicLong time, int permits, RedissonLockEntry entry, RPromise<Boolean> result) {
private void tryAcquireAsync(AtomicLong time, int permits, RedissonLockEntry entry, CompletableFuture<Boolean> result) {
if (result.isDone()) {
unsubscribe(entry);
return;
@ -139,22 +138,22 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
if (time.get() <= 0) {
unsubscribe(entry);
result.trySuccess(false);
result.complete(false);
return;
}
long curr = System.currentTimeMillis();
RFuture<Boolean> tryAcquireFuture = tryAcquireAsync(permits);
tryAcquireFuture.onComplete((res, e) -> {
tryAcquireFuture.whenComplete((res, e) -> {
if (e != null) {
unsubscribe(entry);
result.tryFailure(e);
result.completeExceptionally(e);
return;
}
if (res) {
unsubscribe(entry);
if (!result.trySuccess(true)) {
if (!result.complete(true)) {
releaseAsync(permits);
}
return;
@ -165,7 +164,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
if (time.get() <= 0) {
unsubscribe(entry);
result.trySuccess(false);
result.complete(false);
return;
}
@ -175,7 +174,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
tryAcquireAsync(time, permits, entry, result);
} else {
AtomicBoolean executed = new AtomicBoolean();
AtomicReference<Timeout> futureRef = new AtomicReference<Timeout>();
AtomicReference<Timeout> futureRef = new AtomicReference<>();
Runnable listener = () -> {
executed.set(true);
if (futureRef.get() != null && !futureRef.get().cancel()) {
@ -208,23 +207,23 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
});
}
private void acquireAsync(int permits, RedissonLockEntry entry, RPromise<Void> result) {
private void acquireAsync(int permits, RedissonLockEntry entry, CompletableFuture<Void> result) {
if (result.isDone()) {
unsubscribe(entry);
return;
}
RFuture<Boolean> tryAcquireFuture = tryAcquireAsync(permits);
tryAcquireFuture.onComplete((res, e) -> {
tryAcquireFuture.whenComplete((res, e) -> {
if (e != null) {
unsubscribe(entry);
result.tryFailure(e);
result.completeExceptionally(e);
return;
}
if (res) {
unsubscribe(entry);
if (!result.trySuccess(null)) {
if (!result.complete(null)) {
releaseAsync(permits);
}
return;
@ -261,7 +260,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
throw new IllegalArgumentException("Permits amount can't be negative");
}
if (permits == 0) {
return RedissonPromise.newSucceededFuture(true);
return new CompletableFutureWrapper<>(true);
}
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
@ -347,18 +346,18 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
@Override
public RFuture<Boolean> tryAcquireAsync(int permits, long waitTime, TimeUnit unit) {
RPromise<Boolean> result = new RedissonPromise<Boolean>();
CompletableFuture<Boolean> result = new CompletableFuture<>();
AtomicLong time = new AtomicLong(unit.toMillis(waitTime));
long curr = System.currentTimeMillis();
RFuture<Boolean> tryAcquireFuture = tryAcquireAsync(permits);
tryAcquireFuture.onComplete((res, e) -> {
tryAcquireFuture.whenComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
result.completeExceptionally(e);
return;
}
if (res) {
if (!result.trySuccess(true)) {
if (!result.complete(true)) {
releaseAsync(permits);
}
return;
@ -368,7 +367,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
time.addAndGet(-elap);
if (time.get() <= 0) {
result.trySuccess(false);
result.complete(false);
return;
}
@ -377,7 +376,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe();
subscribeFuture.whenComplete((r, ex) -> {
if (ex != null) {
result.tryFailure(ex);
result.completeExceptionally(ex);
return;
}
@ -390,7 +389,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
if (time.get() < 0) {
unsubscribe(r);
result.trySuccess(false);
result.complete(false);
return;
}
@ -402,14 +401,14 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
@Override
public void run(Timeout timeout) throws Exception {
if (!subscribeFuture.isDone()) {
result.trySuccess(false);
result.complete(false);
}
}
}, time.get(), TimeUnit.MILLISECONDS);
futureRef.set(scheduledFuture);
}
});
return result;
return new CompletableFutureWrapper<>(result);
}
private CompletableFuture<RedissonLockEntry> subscribe() {
@ -446,7 +445,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
throw new IllegalArgumentException("Permits amount can't be negative");
}
if (permits == 0) {
return RedissonPromise.newSucceededFuture(null);
return new CompletableFutureWrapper<>((Void)null);
}
RFuture<Void> future = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
@ -454,10 +453,8 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
"redis.call('publish', KEYS[2], value); ",
Arrays.asList(getRawName(), getChannelName()), permits);
if (log.isDebugEnabled()) {
future.onComplete((o, e) -> {
if (e == null) {
log.debug("released, permits: {}, name: {}", permits, getName());
}
future.thenAccept(o -> {
log.debug("released, permits: {}, name: {}", permits, getName());
});
}
return future;
@ -508,7 +505,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
Arrays.asList(getRawName(), getChannelName()), permits);
if (log.isDebugEnabled()) {
future.onComplete((r, e) -> {
future.thenAccept(r -> {
if (r) {
log.debug("permits set, permits: {}, name: {}", permits, getName());
} else {

Loading…
Cancel
Save