refactoring

pull/4087/head
Nikita Koksharov 3 years ago
parent 3baa322917
commit d446ad427e

@ -215,7 +215,7 @@ public class RedissonBinaryStream extends RedissonBucket<byte[]> implements RBin
@Override
public <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler) {
RFuture<Integer> res = (RFuture<Integer>) read(dst);
res.onComplete((r, e) -> {
res.whenComplete((r, e) -> {
if (e != null) {
handler.failed(e, attachment);
} else {
@ -244,7 +244,7 @@ public class RedissonBinaryStream extends RedissonBucket<byte[]> implements RBin
@Override
public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
RFuture<Integer> res = (RFuture<Integer>) write(src);
res.onComplete((r, e) -> {
res.whenComplete((r, e) -> {
if (e != null) {
handler.failed(e, attachment);
} else {

@ -40,12 +40,14 @@ import org.redisson.client.protocol.convertor.VoidReplayConvertor;
import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.Hash;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
/**
@ -330,39 +332,39 @@ public class RedissonBloomFilter<T> extends RedissonExpirable implements RBloomF
@Override
public RFuture<Void> renameAsync(String newName) {
String newConfigName = suffixName(newName, "config");
RFuture<Void> f = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
RFuture<Void> future = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
"if redis.call('exists', KEYS[1]) == 1 then " +
"redis.call('rename', KEYS[1], ARGV[1]); " +
"end; " +
"return redis.call('rename', KEYS[2], ARGV[2]); ",
Arrays.<Object>asList(getRawName(), configName), newName, newConfigName);
f.onComplete((value, e) -> {
if (e == null) {
CompletionStage<Void> f = future.thenApply(value -> {
setName(newName);
this.configName = newConfigName;
}
return value;
});
return f;
return new CompletableFutureWrapper<>(f);
}
@Override
public RFuture<Boolean> renamenxAsync(String newName) {
String newConfigName = suffixName(newName, "config");
RFuture<Boolean> f = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local r = redis.call('renamenx', KEYS[1], ARGV[1]); "
+ "if r == 0 then "
+ " return 0; "
+ "else "
+ " return redis.call('renamenx', KEYS[2], ARGV[2]); "
+ "end; ",
Arrays.<Object>asList(getRawName(), configName), newName, newConfigName);
f.onComplete((value, e) -> {
if (e == null && value) {
Arrays.asList(getRawName(), configName), newName, newConfigName);
CompletionStage<Boolean> f = future.thenApply(value -> {
if (value) {
setName(newName);
this.configName = newConfigName;
}
return value;
});
return f;
return new CompletableFutureWrapper<>(f);
}
private <V> V check(V result) {

@ -73,7 +73,7 @@ public class RedissonPriorityBlockingQueue<V> extends RedissonPriorityQueue<V> i
long start = System.currentTimeMillis();
commandExecutor.getConnectionManager().getGroup().schedule(() -> {
RFuture<V> future = wrapLockedAsync(command, params);
future.onComplete((res, e) -> {
future.whenComplete((res, e) -> {
if (e != null && !(e instanceof RedisConnectionException)) {
result.tryFailure(e);
return;

@ -24,10 +24,11 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.MapEntriesDecoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.CompletableFutureWrapper;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@ -95,16 +96,8 @@ public class RedissonRateLimiter extends RedissonExpirable implements RRateLimit
@Override
public RFuture<Void> acquireAsync(long permits) {
RPromise<Void> promise = new RedissonPromise<Void>();
tryAcquireAsync(permits, -1, null).onComplete((res, e) -> {
if (e != null) {
promise.tryFailure(e);
return;
}
promise.trySuccess(null);
});
return promise;
CompletionStage<Void> f = tryAcquireAsync(permits, -1, null).thenApply(res -> null);
return new CompletableFutureWrapper<>(f);
}
@Override
@ -124,59 +117,57 @@ public class RedissonRateLimiter extends RedissonExpirable implements RRateLimit
@Override
public RFuture<Boolean> tryAcquireAsync(long permits, long timeout, TimeUnit unit) {
RPromise<Boolean> promise = new RedissonPromise<Boolean>();
long timeoutInMillis = -1;
if (timeout >= 0) {
timeoutInMillis = unit.toMillis(timeout);
}
tryAcquireAsync(permits, promise, timeoutInMillis);
return promise;
CompletableFuture<Boolean> f = tryAcquireAsync(permits, timeoutInMillis);
return new CompletableFutureWrapper<>(f);
}
private void tryAcquireAsync(long permits, RPromise<Boolean> promise, long timeoutInMillis) {
private CompletableFuture<Boolean> tryAcquireAsync(long permits, long timeoutInMillis) {
long s = System.currentTimeMillis();
RFuture<Long> future = tryAcquireAsync(RedisCommands.EVAL_LONG, permits);
future.onComplete((delay, e) -> {
if (e != null) {
promise.tryFailure(e);
return;
}
return future.thenCompose(delay -> {
if (delay == null) {
promise.trySuccess(true);
return;
return CompletableFuture.completedFuture(true);
}
if (timeoutInMillis == -1) {
CompletableFuture<Boolean> f = new CompletableFuture<>();
commandExecutor.getConnectionManager().getGroup().schedule(() -> {
tryAcquireAsync(permits, promise, timeoutInMillis);
CompletableFuture<Boolean> r = tryAcquireAsync(permits, timeoutInMillis);
commandExecutor.transfer(r, f);
}, delay, TimeUnit.MILLISECONDS);
return;
return f;
}
long el = System.currentTimeMillis() - s;
long remains = timeoutInMillis - el;
if (remains <= 0) {
promise.trySuccess(false);
return;
return CompletableFuture.completedFuture(false);
}
CompletableFuture<Boolean> f = new CompletableFuture<>();
if (remains < delay) {
commandExecutor.getConnectionManager().getGroup().schedule(() -> {
promise.trySuccess(false);
f.complete(false);
}, remains, TimeUnit.MILLISECONDS);
} else {
long start = System.currentTimeMillis();
commandExecutor.getConnectionManager().getGroup().schedule(() -> {
long elapsed = System.currentTimeMillis() - start;
if (remains <= elapsed) {
promise.trySuccess(false);
f.complete(false);
return;
}
tryAcquireAsync(permits, promise, remains - elapsed);
CompletableFuture<Boolean> r = tryAcquireAsync(permits, remains - elapsed);
commandExecutor.transfer(r, f);
}, delay, TimeUnit.MILLISECONDS);
}
});
return f;
}).toCompletableFuture();
}
private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) {

@ -184,7 +184,7 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl
private void poll(String id, StreamMessageId startId) {
readFuture = commandExecutor.readAsync(getRawName(), new CompositeCodec(StringCodec.INSTANCE, codec),
RedisCommands.XREAD_BLOCKING_SINGLE, "BLOCK", 0, "STREAMS", getRawName(), startId);
readFuture.onComplete((res, ex) -> {
readFuture.whenComplete((res, ex) -> {
if (readFuture.isCancelled()) {
return;
}
@ -246,7 +246,7 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl
+ "return r ~= false; ",
Arrays.asList(getRawName(), getSubscribersName(), getMapName(), getCounter(), getTimeout()),
lastId, id, time);
updateFuture.onComplete((re, exc) -> {
updateFuture.whenComplete((re, exc) -> {
if (exc != null) {
if (exc instanceof RedissonShutdownException) {
return;
@ -333,7 +333,7 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl
+ "return 1; ",
Arrays.asList(getTimeout()),
System.currentTimeMillis() + commandExecutor.getConnectionManager().getCfg().getReliableTopicWatchdogTimeout(), subscriberId.get());
future.onComplete((res, e) -> {
future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update reliable topic " + getRawName() + " expiration time", e);
return;

Loading…
Cancel
Save