refactoring

pull/4061/head
Nikita Koksharov 3 years ago
parent 45f93eda91
commit cc956fcb53

@ -29,17 +29,13 @@ import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.CompletableFutureWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
/**
*
@ -250,35 +246,17 @@ public abstract class LocalCacheListener {
}
public RFuture<Void> clearLocalCacheAsync() {
RPromise<Void> result = new RedissonPromise<Void>();
cache.clear();
byte[] id = generateId();
RFuture<Long> future = invalidationTopic.publishAsync(new LocalCachedMapClear(instanceId, id, true));
future.onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
CompletionStage<Void> f = future.thenCompose(res -> {
RSemaphore semaphore = getClearSemaphore(id);
semaphore.tryAcquireAsync(res.intValue() - 1, 50, TimeUnit.SECONDS).onComplete((r, ex) -> {
if (ex != null) {
result.tryFailure(ex);
return;
}
semaphore.deleteAsync().onComplete((re, exc) -> {
if (exc != null) {
result.tryFailure(exc);
return;
}
result.trySuccess(null);
});
});
return semaphore.tryAcquireAsync(res.intValue() - 1, 50, TimeUnit.SECONDS)
.thenCompose(r -> {
return semaphore.deleteAsync().thenApply(re -> null);
});
});
return result;
return new CompletableFutureWrapper<>(f);
}
public RTopic getInvalidationTopic() {
@ -328,7 +306,7 @@ public abstract class LocalCacheListener {
return;
}
object.isExistsAsync().onComplete((res, e) -> {
object.isExistsAsync().whenComplete((res, e) -> {
if (e != null) {
log.error("Can't check existance", e);
return;
@ -339,9 +317,9 @@ public abstract class LocalCacheListener {
return;
}
RScoredSortedSet<byte[]> logs = new RedissonScoredSortedSet<byte[]>(ByteArrayCodec.INSTANCE, commandExecutor, getUpdatesLogName(), null);
RScoredSortedSet<byte[]> logs = new RedissonScoredSortedSet<>(ByteArrayCodec.INSTANCE, commandExecutor, getUpdatesLogName(), null);
logs.valueRangeAsync(lastInvalidate, true, Double.POSITIVE_INFINITY, true)
.onComplete((r, ex) -> {
.whenComplete((r, ex) -> {
if (ex != null) {
log.error("Can't load update log", ex);
return;

@ -26,14 +26,14 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.Time;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RPromise;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.RedisURI;
import org.redisson.misc.RedissonPromise;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
@ -73,21 +73,13 @@ public class RedisNode implements RedisClusterMaster, RedisClusterSlave, RedisMa
@Override
public RFuture<Boolean> pingAsync(long timeout, TimeUnit timeUnit) {
RPromise<Boolean> result = new RedissonPromise<>();
RFuture<Boolean> f = commandExecutor.readAsync(client, null, RedisCommands.PING_BOOL);
f.onComplete((res, e) -> {
if (e != null) {
result.trySuccess(false);
return;
}
result.trySuccess(res);
});
CompletionStage<Boolean> s = f.exceptionally(e -> false);
commandExecutor.getConnectionManager().newTimeout(t -> {
RedisTimeoutException ex = new RedisTimeoutException("Command execution timeout (" + timeUnit.toMillis(timeout) + "ms) for command: PING, Redis client: " + client);
result.tryFailure(ex);
s.toCompletableFuture().completeExceptionally(ex);
}, timeout, timeUnit);
return result;
return new CompletableFutureWrapper<>(s);
}
@Override

@ -27,13 +27,14 @@ import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.Time;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RPromise;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.RedisURI;
import org.redisson.misc.RedissonPromise;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
@ -119,35 +120,24 @@ public class SentinelRedisNode implements RedisSentinel, RedisSentinelAsync {
}
private <T> RFuture<T> executeAsync(T defaultValue, Codec codec, long timeout, RedisCommand<T> command, Object... params) {
RPromise<T> result = new RedissonPromise<>();
CompletionStage<RedisConnection> connectionFuture = client.connectAsync();
connectionFuture.whenComplete((connection, ex) -> {
if (ex != null) {
if (defaultValue != null) {
result.trySuccess(defaultValue);
} else {
result.tryFailure(ex);
}
return;
CompletableFuture<RedisConnection> connectionFuture = client.connectAsync().toCompletableFuture();
CompletableFuture<Object> f = connectionFuture.thenCompose(connection -> {
return connection.async(timeout, codec, command, params);
}).handle((r, e) -> {
if (connectionFuture.isDone() && !connectionFuture.isCompletedExceptionally()) {
connectionFuture.getNow(null).closeAsync();
}
RFuture<T> future = connection.async(timeout, codec, command, params);
future.onComplete((r, e) -> {
connection.closeAsync();
if (e != null) {
if (defaultValue != null) {
result.trySuccess(defaultValue);
} else {
result.tryFailure(e);
}
return;
if (e != null) {
if (defaultValue != null) {
return defaultValue;
}
throw new CompletionException(e);
}
result.trySuccess(r);
});
return r;
});
return result;
return new CompletableFutureWrapper<T>((CompletionStage<T>) f);
}
@Override

@ -40,6 +40,7 @@ import java.lang.reflect.Proxy;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@ -115,14 +116,14 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
RemoteServiceRequest request = new RemoteServiceRequest(executorId, requestId.toString(), method.getName(),
remoteService.getMethodSignature(method), args, optionsCopy, System.currentTimeMillis());
final RFuture<RemoteServiceAck> ackFuture;
CompletableFuture<RemoteServiceAck> ackFuture;
if (optionsCopy.isAckExpected()) {
ackFuture = pollResponse(optionsCopy.getAckTimeoutInMillis(), requestId, false);
} else {
ackFuture = null;
}
final RPromise<RRemoteServiceResponse> responseFuture;
CompletableFuture<RRemoteServiceResponse> responseFuture;
if (optionsCopy.isResultExpected()) {
long timeout = remoteService.getTimeout(optionsCopy.getExecutionTimeoutInMillis(), request);
responseFuture = pollResponse(timeout, requestId, false);
@ -157,7 +158,7 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
}
if (optionsCopy.isAckExpected()) {
ackFuture.onComplete((ack, ex) -> {
ackFuture.whenComplete((ack, ex) -> {
if (ex != null) {
if (responseFuture != null) {
responseFuture.cancel(false);
@ -169,9 +170,9 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
if (ack == null) {
String ackName = remoteService.getAckName(requestId);
RFuture<RemoteServiceAck> ackFutureAttempt =
CompletionStage<RemoteServiceAck> ackFutureAttempt =
tryPollAckAgainAsync(optionsCopy, ackName, requestId);
ackFutureAttempt.onComplete((re, ex2) -> {
ackFutureAttempt.whenComplete((re, ex2) -> {
if (ex2 != null) {
result.completeExceptionally(ex2);
return;
@ -209,9 +210,9 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
}
private void awaitResultAsync(RemoteInvocationOptions optionsCopy, RemotePromise<Object> result,
String ackName, RFuture<RRemoteServiceResponse> responseFuture) {
String ackName, CompletableFuture<RRemoteServiceResponse> responseFuture) {
RFuture<Boolean> deleteFuture = new RedissonBucket<>(commandExecutor, ackName).deleteAsync();
deleteFuture.onComplete((res, e) -> {
deleteFuture.whenComplete((res, e) -> {
if (e != null) {
result.completeExceptionally(e);
return;
@ -222,13 +223,13 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
}
protected void awaitResultAsync(RemoteInvocationOptions optionsCopy, RemotePromise<Object> result,
RFuture<RRemoteServiceResponse> responseFuture) {
CompletionStage<RRemoteServiceResponse> responseFuture) {
// poll for the response only if expected
if (!optionsCopy.isResultExpected()) {
return;
}
responseFuture.onComplete((res, e) -> {
responseFuture.whenComplete((res, e) -> {
if (e != null) {
result.completeExceptionally(e);
return;
@ -408,7 +409,7 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
if (!optionsCopy.isResultExpected()) {
RemoteInvocationOptions options = new RemoteInvocationOptions(optionsCopy);
options.expectResultWithin(60, TimeUnit.SECONDS);
RFuture<RRemoteServiceResponse> responseFuture = pollResponse(options.getExecutionTimeoutInMillis(), remotePromise.getRequestId(), false);
CompletionStage<RRemoteServiceResponse> responseFuture = pollResponse(options.getExecutionTimeoutInMillis(), remotePromise.getRequestId(), false);
awaitResultAsync(options, remotePromise, responseFuture);
}
}

@ -24,13 +24,13 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.remote.ResponseEntry.Result;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
@ -70,9 +70,8 @@ public abstract class BaseRemoteProxy {
return requestQueueNameCache.computeIfAbsent(remoteInterface, k -> "{" + name + ":" + k.getName() + "}");
}
protected RFuture<RemoteServiceAck> tryPollAckAgainAsync(RemoteInvocationOptions optionsCopy,
String ackName, RequestId requestId) {
RPromise<RemoteServiceAck> promise = new RedissonPromise<>();
protected CompletionStage<RemoteServiceAck> tryPollAckAgainAsync(RemoteInvocationOptions optionsCopy,
String ackName, RequestId requestId) {
RFuture<Boolean> ackClientsFuture = commandExecutor.evalWriteNoRetryAsync(ackName, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('setnx', KEYS[1], 1) == 1 then "
+ "redis.call('pexpire', KEYS[1], ARGV[1]);"
@ -80,33 +79,18 @@ public abstract class BaseRemoteProxy {
+ "end;"
+ "redis.call('del', KEYS[1]);"
+ "return 1;",
Arrays.<Object> asList(ackName), optionsCopy.getAckTimeoutInMillis());
ackClientsFuture.onComplete((res, e) -> {
if (e != null) {
promise.tryFailure(e);
return;
}
Arrays.asList(ackName), optionsCopy.getAckTimeoutInMillis());
return ackClientsFuture.thenCompose(res -> {
if (res) {
RPromise<RemoteServiceAck> ackFuture = pollResponse(commandExecutor.getConnectionManager().getConfig().getTimeout(), requestId, true);
ackFuture.onComplete((r, ex) -> {
if (ex != null) {
promise.tryFailure(ex);
return;
}
promise.trySuccess(r);
});
} else {
promise.trySuccess(null);
return pollResponse(commandExecutor.getConnectionManager().getConfig().getTimeout(), requestId, true);
}
return CompletableFuture.completedFuture(null);
});
return promise;
}
protected final <T extends RRemoteServiceResponse> RPromise<T> pollResponse(long timeout,
protected final <T extends RRemoteServiceResponse> CompletableFuture<T> pollResponse(long timeout,
RequestId requestId, boolean insertFirst) {
RPromise<T> responseFuture = new RedissonPromise<T>();
CompletableFuture<T> responseFuture = new CompletableFuture<T>();
ResponseEntry entry;
synchronized (responses) {
@ -125,13 +109,14 @@ public abstract class BaseRemoteProxy {
} else {
list.add(res);
}
}
pollResponse(entry);
return responseFuture;
}
private <T extends RRemoteServiceResponse> ScheduledFuture<?> createResponseTimeout(long timeout, RequestId requestId, RPromise<T> responseFuture) {
private <T extends RRemoteServiceResponse> ScheduledFuture<?> createResponseTimeout(long timeout, RequestId requestId, CompletableFuture<T> responseFuture) {
return commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() {
@Override
public void run() {
@ -142,7 +127,7 @@ public abstract class BaseRemoteProxy {
}
RemoteServiceTimeoutException ex = new RemoteServiceTimeoutException("No response after " + timeout + "ms");
if (!responseFuture.tryFailure(ex)) {
if (!responseFuture.completeExceptionally(ex)) {
return;
}
@ -159,8 +144,8 @@ public abstract class BaseRemoteProxy {
}, timeout, TimeUnit.MILLISECONDS);
}
private <T extends RRemoteServiceResponse> void addCancelHandling(RequestId requestId, RPromise<T> responseFuture) {
responseFuture.onComplete((res, ex) -> {
private <T extends RRemoteServiceResponse> void addCancelHandling(RequestId requestId, CompletableFuture<T> responseFuture) {
responseFuture.whenComplete((res, ex) -> {
if (!responseFuture.isCancelled()) {
return;
}
@ -201,7 +186,7 @@ public abstract class BaseRemoteProxy {
RBlockingQueue<RRemoteServiceResponse> queue = getBlockingQueue(responseQueueName, codec);
RFuture<RRemoteServiceResponse> future = queue.takeAsync();
future.onComplete(createResponseListener());
future.whenComplete(createResponseListener());
}
private BiConsumer<RRemoteServiceResponse, Throwable> createResponseListener() {
@ -210,8 +195,8 @@ public abstract class BaseRemoteProxy {
log.error("Can't get response from " + responseQueueName, e);
return;
}
RPromise<RRemoteServiceResponse> promise;
CompletableFuture<RRemoteServiceResponse> promise;
synchronized (responses) {
ResponseEntry entry = responses.get(responseQueueName);
if (entry == null) {
@ -222,7 +207,7 @@ public abstract class BaseRemoteProxy {
List<Result> list = entry.getResponses().get(key);
if (list == null) {
RBlockingQueue<RRemoteServiceResponse> responseQueue = getBlockingQueue(responseQueueName, codec);
responseQueue.takeAsync().onComplete(createResponseListener());
responseQueue.takeAsync().whenComplete(createResponseListener());
return;
}
@ -238,12 +223,12 @@ public abstract class BaseRemoteProxy {
responses.remove(responseQueueName, entry);
} else {
RBlockingQueue<RRemoteServiceResponse> responseQueue = getBlockingQueue(responseQueueName, codec);
responseQueue.takeAsync().onComplete(createResponseListener());
responseQueue.takeAsync().whenComplete(createResponseListener());
}
}
if (promise != null) {
promise.trySuccess(response);
promise.complete(response);
}
};
}

@ -157,7 +157,7 @@ public abstract class BaseRemoteService {
RMap<String, T> canceledRequests = getMap(mapName);
RFuture<T> future = canceledRequests.removeAsync(requestId.toString());
future.onComplete((request, ex) -> {
future.whenComplete((request, ex) -> {
if (cancelRequest.isDone()) {
return;
}

@ -18,11 +18,10 @@ package org.redisson.remote;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.misc.RPromise;
/**
*
* @author Nikita Koksharov
@ -31,18 +30,18 @@ import org.redisson.misc.RPromise;
public class ResponseEntry {
public static class Result {
private final RPromise<? extends RRemoteServiceResponse> promise;
private final CompletableFuture<? extends RRemoteServiceResponse> promise;
private final ScheduledFuture<?> responseTimeoutFuture;
public Result(RPromise<? extends RRemoteServiceResponse> promise, ScheduledFuture<?> responseTimeoutFuture) {
public Result(CompletableFuture<? extends RRemoteServiceResponse> promise, ScheduledFuture<?> responseTimeoutFuture) {
super();
this.promise = promise;
this.responseTimeoutFuture = responseTimeoutFuture;
}
public <T extends RRemoteServiceResponse> RPromise<T> getPromise() {
return (RPromise<T>) promise;
public <T extends RRemoteServiceResponse> CompletableFuture<T> getPromise() {
return (CompletableFuture<T>) promise;
}
public ScheduledFuture<?> getResponseTimeoutFuture() {

@ -16,13 +16,11 @@
package org.redisson.remote;
import org.redisson.RedissonBucket;
import org.redisson.api.RFuture;
import org.redisson.api.RemoteInvocationOptions;
import org.redisson.client.RedisException;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.executor.RemotePromise;
import org.redisson.misc.RPromise;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
@ -68,14 +66,14 @@ public class SyncRemoteProxy extends BaseRemoteProxy {
RemoteServiceRequest request = new RemoteServiceRequest(executorId, requestId.toString(), method.getName(),
remoteService.getMethodSignature(method), args, optionsCopy, System.currentTimeMillis());
final RFuture<RemoteServiceAck> ackFuture;
CompletableFuture<RemoteServiceAck> ackFuture;
if (optionsCopy.isAckExpected()) {
ackFuture = pollResponse(optionsCopy.getAckTimeoutInMillis(), requestId, false);
} else {
ackFuture = null;
}
final RPromise<RRemoteServiceResponse> responseFuture;
CompletableFuture<RRemoteServiceResponse> responseFuture;
if (optionsCopy.isResultExpected()) {
long timeout = remoteService.getTimeout(optionsCopy.getExecutionTimeoutInMillis(), request);
responseFuture = pollResponse(timeout, requestId, false);
@ -118,7 +116,7 @@ public class SyncRemoteProxy extends BaseRemoteProxy {
// skip
}
if (ack == null) {
RFuture<RemoteServiceAck> ackFutureAttempt =
CompletionStage<RemoteServiceAck> ackFutureAttempt =
tryPollAckAgainAsync(optionsCopy, ackName, requestId);
try {
ack = ackFutureAttempt.toCompletableFuture().get(optionsCopy.getAckTimeoutInMillis(), TimeUnit.MILLISECONDS);

Loading…
Cancel
Save