refactoring

pull/4031/head
Nikita Koksharov 3 years ago
parent 8a3943906a
commit 9d1643e75f

@ -31,10 +31,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
/**
*
@ -122,7 +119,7 @@ public class RedisNodes<N extends Node> implements NodesGroup<N> {
Map<RedisConnection, RFuture<String>> result = new ConcurrentHashMap<>(clients.size());
CountDownLatch latch = new CountDownLatch(clients.size());
for (RedisClientEntry entry : clients) {
CompletableFuture<RedisConnection> f = entry.getClient().connectAsync();
CompletionStage<RedisConnection> f = entry.getClient().connectAsync();
f.whenComplete((c, e) -> {
if (c != null) {
RFuture<String> r = c.async(timeUnit.toMillis(timeout), RedisCommands.PING);

@ -32,8 +32,10 @@ import io.netty.util.NetUtil;
import io.netty.util.Timer;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import org.redisson.api.RFuture;
import org.redisson.client.handler.RedisChannelInitializer;
import org.redisson.client.handler.RedisChannelInitializer.Type;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.RedisURI;
import java.net.InetAddress;
@ -149,7 +151,7 @@ public final class RedisClient {
public RedisConnection connect() {
try {
return connectAsync().join();
return connectAsync().toCompletableFuture().join();
} catch (CompletionException e) {
if (e.getCause() instanceof RedisException) {
throw (RedisException) e.getCause();
@ -196,9 +198,9 @@ public final class RedisClient {
return promise;
}
public CompletableFuture<RedisConnection> connectAsync() {
public RFuture<RedisConnection> connectAsync() {
CompletableFuture<InetSocketAddress> addrFuture = resolveAddr();
return addrFuture.thenCompose(res -> {
CompletableFuture<RedisConnection> f = addrFuture.thenCompose(res -> {
CompletableFuture<RedisConnection> r = new CompletableFuture<>();
ChannelFuture channelFuture = bootstrap.connect(res);
channelFuture.addListener(new ChannelFutureListener() {
@ -238,11 +240,12 @@ public final class RedisClient {
});
return r;
});
return new CompletableFutureWrapper<>(f);
}
public RedisPubSubConnection connectPubSub() {
try {
return connectPubSubAsync().join();
return connectPubSubAsync().toCompletableFuture().join();
} catch (CompletionException e) {
if (e.getCause() instanceof RedisException) {
throw (RedisException) e.getCause();
@ -252,9 +255,9 @@ public final class RedisClient {
}
}
public CompletableFuture<RedisPubSubConnection> connectPubSubAsync() {
public RFuture<RedisPubSubConnection> connectPubSubAsync() {
CompletableFuture<InetSocketAddress> nameFuture = resolveAddr();
return nameFuture.thenCompose(res -> {
CompletableFuture<RedisPubSubConnection> f = nameFuture.thenCompose(res -> {
CompletableFuture<RedisPubSubConnection> r = new CompletableFuture<>();
ChannelFuture channelFuture = pubSubBootstrap.connect(res);
channelFuture.addListener(new ChannelFutureListener() {
@ -294,18 +297,19 @@ public final class RedisClient {
});
return r;
});
return new CompletableFutureWrapper<>(f);
}
public void shutdown() {
shutdownAsync().join();
shutdownAsync().toCompletableFuture().join();
}
public CompletableFuture<Void> shutdownAsync() {
public RFuture<Void> shutdownAsync() {
shutdown = true;
CompletableFuture<Void> result = new CompletableFuture<>();
if (channels.isEmpty() || config.getGroup().isShuttingDown()) {
shutdown(result);
return result;
return new CompletableFutureWrapper<>(result);
}
ChannelGroupFuture channelsFuture = channels.newCloseFuture();
@ -327,8 +331,8 @@ public final class RedisClient {
connection.closeAsync();
}
}
return result;
return new CompletableFutureWrapper<>(result);
}
public boolean isShutdown() {

@ -33,7 +33,6 @@ import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReadMode;
import org.redisson.connection.*;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.AsyncCountDownLatch;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedisURI;
import org.redisson.misc.RedissonPromise;
@ -88,9 +87,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
List<String> failedMasters = new ArrayList<String>();
for (String address : cfg.getNodeAddresses()) {
RedisURI addr = new RedisURI(address);
CompletableFuture<RedisConnection> connectionFuture = connectToNode(cfg, addr, addr.getHost());
CompletionStage<RedisConnection> connectionFuture = connectToNode(cfg, addr, addr.getHost());
try {
RedisConnection connection = connectionFuture.join();
RedisConnection connection = connectionFuture.toCompletableFuture().join();
if (cfg.getNodeAddresses().size() == 1 && !addr.isIP()) {
configEndpointHostName = addr.getHost();
@ -111,8 +110,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
lastClusterNode = addr;
RFuture<Collection<ClusterPartition>> partitionsFuture = parsePartitions(nodes);
Collection<ClusterPartition> partitions = partitionsFuture.syncUninterruptibly().getNow();
CompletableFuture<Collection<ClusterPartition>> partitionsFuture = parsePartitions(nodes);
Collection<ClusterPartition> partitions = partitionsFuture.join();
List<CompletableFuture<Void>> masterFutures = new ArrayList<>();
for (ClusterPartition partition : partitions) {
if (partition.isMasterFail()) {
@ -278,7 +277,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
return result;
}
CompletableFuture<RedisConnection> connectionFuture = connectToNode(cfg, partition.getMasterAddress(), configEndpointHostName);
CompletionStage<RedisConnection> connectionFuture = connectToNode(cfg, partition.getMasterAddress(), configEndpointHostName);
connectionFuture.whenComplete((connection, ex1) -> {
if (ex1 != null) {
log.error("Can't connect to master: {} with slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges());
@ -419,7 +418,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
return;
}
RedisURI uri = iterator.next();
CompletableFuture<RedisConnection> connectionFuture = connectToNode(cfg, uri, configEndpointHostName);
CompletionStage<RedisConnection> connectionFuture = connectToNode(cfg, uri, configEndpointHostName);
connectionFuture.whenComplete((connection, e) -> {
if (e != null) {
lastException.set(e);
@ -461,8 +460,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
log.debug("cluster nodes state got from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue);
}
RFuture<Collection<ClusterPartition>> newPartitionsFuture = parsePartitions(nodes);
newPartitionsFuture.onComplete((newPartitions, ex) -> {
CompletableFuture<Collection<ClusterPartition>> newPartitionsFuture = parsePartitions(nodes);
newPartitionsFuture.whenComplete((newPartitions, ex) -> {
RFuture<Void> masterFuture = checkMasterNodesChange(cfg, newPartitions);
checkSlaveNodesChange(newPartitions);
masterFuture.onComplete((res, exc) -> {
@ -762,10 +761,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
return natMapper.map(address);
}
private RFuture<Collection<ClusterPartition>> parsePartitions(List<ClusterNodeInfo> nodes) {
private CompletableFuture<Collection<ClusterPartition>> parsePartitions(List<ClusterNodeInfo> nodes) {
Map<String, ClusterPartition> partitions = new ConcurrentHashMap<>();
AsyncCountDownLatch latch = new AsyncCountDownLatch();
int counter = 0;
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (ClusterNodeInfo clusterNodeInfo : nodes) {
if (clusterNodeInfo.containsFlag(Flag.NOADDR)
|| clusterNodeInfo.containsFlag(Flag.HANDSHAKE)
@ -787,13 +785,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
continue;
}
RFuture<RedisURI> ipFuture = resolveIP(clusterNodeInfo.getAddress());
counter++;
ipFuture.onComplete((address, e) -> {
if (e != null) {
latch.countDown();
return;
}
CompletableFuture<RedisURI> ipFuture = resolveIP(clusterNodeInfo.getAddress());
CompletableFuture<Void> f = ipFuture.thenAccept(address -> {
if (clusterNodeInfo.containsFlag(Flag.SLAVE)) {
ClusterPartition masterPartition = partitions.computeIfAbsent(masterId, k -> new ClusterPartition(masterId));
@ -815,22 +808,21 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
masterPartition.setMasterFail(true);
}
}
latch.countDown();
});
futures.add(f);
}
RPromise<Collection<ClusterPartition>> result = new RedissonPromise<>();
latch.latch(() -> {
CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
return future.handle((r, e) -> {
addCascadeSlaves(partitions.values());
List<ClusterPartition> ps = partitions.values()
.stream()
.filter(cp -> cp.getType() == Type.MASTER
&& cp.getMasterAddress() != null)
.collect(Collectors.toList());
result.trySuccess(ps);
}, counter);
return result;
.stream()
.filter(cp -> cp.getType() == Type.MASTER
&& cp.getMasterAddress() != null)
.collect(Collectors.toList());
return ps;
});
}
private void addCascadeSlaves(Collection<ClusterPartition> partitions) {

@ -34,15 +34,11 @@ import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.NodeSource;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.misc.AsyncCountDownLatch;
import org.redisson.misc.CountableListener;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@ -408,38 +404,28 @@ public class CommandBatchService extends CommandAsyncService {
return;
}
RPromise<Map<MasterSlaveEntry, List<Object>>> mainPromise = new RedissonPromise<>();
Map<MasterSlaveEntry, List<Object>> result = new ConcurrentHashMap<>();
CountableListener<Map<MasterSlaveEntry, List<Object>>> listener = new CountableListener<>(mainPromise, result);
listener.setCounter(connections.size());
List<CompletableFuture<Void>> futures = new ArrayList<>(commands.size());
for (Map.Entry<MasterSlaveEntry, Entry> entry : commands.entrySet()) {
RPromise<List<Object>> execPromise = new RedissonPromise<>();
async(entry.getValue().isReadOnlyMode(), new NodeSource(entry.getKey()), connectionManager.getCodec(), RedisCommands.EXEC,
new Object[] {}, execPromise, false, false);
execPromise.onComplete((r, ex) -> {
if (ex != null) {
mainPromise.tryFailure(ex);
return;
}
CompletionStage<Void> f = execPromise.thenCompose(r -> {
BatchCommandData<?, Integer> lastCommand = (BatchCommandData<?, Integer>) entry.getValue().getCommands().peekLast();
result.put(entry.getKey(), r);
if (RedisCommands.WAIT.getName().equals(lastCommand.getCommand().getName())) {
lastCommand.getPromise().onComplete((res, e) -> {
if (e != null) {
mainPromise.tryFailure(e);
return;
}
execPromise.onComplete(listener);
});
} else {
execPromise.onComplete(listener);
return lastCommand.getPromise().thenApply(i -> null);
}
return CompletableFuture.completedFuture(null);
});
futures.add(f.toCompletableFuture());
}
mainPromise.onComplete((res, ex) -> {
CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
future.whenComplete((res, ex) -> {
executed.set(true);
if (ex != null) {
resultPromise.tryFailure(ex);
@ -447,7 +433,7 @@ public class CommandBatchService extends CommandAsyncService {
}
try {
for (java.util.Map.Entry<MasterSlaveEntry, List<Object>> entry : res.entrySet()) {
for (java.util.Map.Entry<MasterSlaveEntry, List<Object>> entry : result.entrySet()) {
Entry commandEntry = commands.get(entry.getKey());
Iterator<Object> resultIter = entry.getValue().iterator();
for (BatchCommandData<?, ?> data : commandEntry.getCommands()) {

@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
@ -416,8 +417,8 @@ public class RedisExecutor<V, R> {
onException();
RFuture<RedisURI> ipAddrFuture = connectionManager.resolveIP(ex.getUrl());
ipAddrFuture.onComplete((ip, e) -> {
CompletableFuture<RedisURI> ipAddrFuture = connectionManager.resolveIP(ex.getUrl());
ipAddrFuture.whenComplete((ip, e) -> {
if (e != null) {
handleError(connectionFuture, e);
return;
@ -433,8 +434,8 @@ public class RedisExecutor<V, R> {
onException();
RFuture<RedisURI> ipAddrFuture = connectionManager.resolveIP(ex.getUrl());
ipAddrFuture.onComplete((ip, e) -> {
CompletableFuture<RedisURI> ipAddrFuture = connectionManager.resolveIP(ex.getUrl());
ipAddrFuture.whenComplete((ip, e) -> {
if (e != null) {
handleError(connectionFuture, e);
return;

@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
import java.util.Deque;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
@ -120,7 +121,7 @@ public class ClientConnectionsEntry {
public CompletableFuture<Void> shutdownAsync() {
connectionManager.getConnectionWatcher().remove(this);
return client.shutdownAsync();
return client.shutdownAsync().toCompletableFuture();
}
public RedisClient getClient() {
@ -187,8 +188,8 @@ public class ClientConnectionsEntry {
freeConnections.add(connection);
}
public CompletableFuture<RedisConnection> connect() {
CompletableFuture<RedisConnection> future = client.connectAsync();
public CompletionStage<RedisConnection> connect() {
CompletionStage<RedisConnection> future = client.connectAsync();
return future.whenComplete((conn, e) -> {
if (e != null) {
return;
@ -222,8 +223,8 @@ public class ClientConnectionsEntry {
connectionManager.getConnectionEventsHub().fireConnect(conn.getRedisClient().getAddr());
}
public CompletableFuture<RedisPubSubConnection> connectPubSub() {
CompletableFuture<RedisPubSubConnection> future = client.connectPubSubAsync();
public CompletionStage<RedisPubSubConnection> connectPubSub() {
CompletionStage<RedisPubSubConnection> future = client.connectPubSubAsync();
return future.whenComplete((conn, e) -> {
if (e != null) {
return;

@ -35,6 +35,7 @@ import org.redisson.pubsub.PublishSubscribeService;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@ -47,7 +48,7 @@ public interface ConnectionManager {
RedisURI applyNatMap(RedisURI address);
RFuture<RedisURI> resolveIP(RedisURI address);
CompletableFuture<RedisURI> resolveIP(RedisURI address);
String getId();

@ -47,7 +47,6 @@ import org.redisson.client.protocol.RedisCommand;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.config.*;
import org.redisson.misc.InfinitySemaphoreLatch;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedisURI;
import org.redisson.misc.RedissonPromise;
import org.redisson.pubsub.PublishSubscribeService;
@ -231,11 +230,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
}
protected final CompletableFuture<RedisConnection> connectToNode(BaseConfig<?> cfg, RedisURI addr, String sslHostname) {
protected final CompletionStage<RedisConnection> connectToNode(BaseConfig<?> cfg, RedisURI addr, String sslHostname) {
return connectToNode(NodeType.MASTER, cfg, addr, sslHostname);
}
protected final CompletableFuture<RedisConnection> connectToNode(NodeType type, BaseConfig<?> cfg, RedisURI addr, String sslHostname) {
protected final CompletionStage<RedisConnection> connectToNode(NodeType type, BaseConfig<?> cfg, RedisURI addr, String sslHostname) {
RedisConnection conn = nodeConnections.get(addr);
if (conn != null) {
if (!conn.isActive()) {
@ -246,7 +245,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
RedisClient client = createClient(type, addr, cfg.getConnectTimeout(), cfg.getTimeout(), sslHostname);
CompletableFuture<RedisConnection> future = client.connectAsync();
CompletionStage<RedisConnection> future = client.connectAsync();
return future.thenCompose(connection -> {
if (connection.isActive()) {
if (!addr.isIP()) {
@ -697,31 +696,31 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
@Override
public RFuture<RedisURI> resolveIP(RedisURI address) {
public CompletableFuture<RedisURI> resolveIP(RedisURI address) {
return resolveIP(address.getScheme(), address);
}
protected RFuture<RedisURI> resolveIP(String scheme, RedisURI address) {
protected CompletableFuture<RedisURI> resolveIP(String scheme, RedisURI address) {
if (address.isIP()) {
RedisURI addr = applyNatMap(address);
return RedissonPromise.newSucceededFuture(addr);
return CompletableFuture.completedFuture(addr);
}
RPromise<RedisURI> result = new RedissonPromise<>();
CompletableFuture<RedisURI> result = new CompletableFuture<>();
AddressResolver<InetSocketAddress> resolver = resolverGroup.getResolver(getGroup().next());
InetSocketAddress addr = InetSocketAddress.createUnresolved(address.getHost(), address.getPort());
Future<InetSocketAddress> future = resolver.resolve(addr);
future.addListener((FutureListener<InetSocketAddress>) f -> {
if (!f.isSuccess()) {
log.error("Unable to resolve " + address, f.cause());
result.tryFailure(f.cause());
result.completeExceptionally(f.cause());
return;
}
InetSocketAddress s = f.getNow();
RedisURI uri = new RedisURI(scheme + "://" + s.getAddress().getHostAddress() + ":" + address.getPort());
uri = applyNatMap(uri);
result.trySuccess(uri);
result.complete(uri);
});
return result;
}

@ -35,6 +35,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -71,10 +72,10 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
for (String address : cfg.getNodeAddresses()) {
RedisURI addr = new RedisURI(address);
CompletableFuture<RedisConnection> connectionFuture = connectToNode(cfg, addr, addr.getHost());
CompletionStage<RedisConnection> connectionFuture = connectToNode(cfg, addr, addr.getHost());
RedisConnection connection = null;
try {
connection = connectionFuture.join();
connection = connectionFuture.toCompletableFuture().join();
} catch (Exception e) {
// skip
}
@ -161,7 +162,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
}
private void checkNode(AsyncCountDownLatch latch, RedisURI uri, ReplicatedServersConfig cfg, Set<InetSocketAddress> slaveIPs) {
CompletableFuture<RedisConnection> connectionFuture = connectToNode(cfg, uri, uri.getHost());
CompletionStage<RedisConnection> connectionFuture = connectToNode(cfg, uri, uri.getHost());
connectionFuture.whenComplete((connection, exc) -> {
if (exc != null) {
log.error(exc.getMessage(), exc);

@ -122,7 +122,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
throw new RedisConnectionException("Master node is undefined! SENTINEL GET-MASTER-ADDR-BY-NAME command returns empty result!");
}
RedisURI masterHost = resolveIP(scheme, master).syncUninterruptibly().getNow();
RedisURI masterHost = resolveIP(scheme, master).join();
this.config.setMasterAddress(masterHost.toString());
currentMaster.set(masterHost);
log.info("master: {} added", masterHost);
@ -138,7 +138,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String flags = map.getOrDefault("flags", "");
String masterLinkStatus = map.getOrDefault("master-link-status", "");
RedisURI uri = resolveIP(host, port).syncUninterruptibly().getNow();
RedisURI uri = resolveIP(host, port).join();
this.config.addSlaveAddress(uri.toString());
log.debug("slave {} state: {}", uri, map);
@ -160,14 +160,14 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String ip = map.get("ip");
String port = map.get("port");
RedisURI uri = resolveIP(ip, port).syncUninterruptibly().getNow();
CompletableFuture<Void> future = registerSentinel(uri, this.config, null);
connectionFutures.add(future);
RedisURI uri = resolveIP(ip, port).join();
CompletionStage<Void> future = registerSentinel(uri, this.config, null);
connectionFutures.add(future.toCompletableFuture());
}
RedisURI sentinelIp = toURI(connection.getRedisClient().getAddr());
CompletableFuture<Void> f = registerSentinel(sentinelIp, this.config, null);
connectionFutures.add(f);
CompletionStage<Void> f = registerSentinel(sentinelIp, this.config, null);
connectionFutures.add(f.toCompletableFuture());
CompletableFuture<Void> future = CompletableFuture.allOf(connectionFutures.toArray(new CompletableFuture[0]));
try {
@ -181,6 +181,9 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
stopThreads();
throw e;
} catch (Exception e) {
if (e instanceof CompletionException) {
e = (Exception) e.getCause();
}
lastException = e;
log.warn(e.getMessage());
} finally {
@ -338,7 +341,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
RedisClient client = iterator.next();
RedisURI addr = toURI(client.getAddr());
CompletableFuture<RedisConnection> connectionFuture = connectToNode(NodeType.SENTINEL, cfg, addr, null);
CompletionStage<RedisConnection> connectionFuture = connectToNode(NodeType.SENTINEL, cfg, addr, null);
connectionFuture.whenComplete((connection, e) -> {
if (e != null) {
lastException.set(e);
@ -447,16 +450,16 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String masterHost = map.get("master-host");
String masterPort = map.get("master-port");
RFuture<RedisURI> slaveAddrFuture = resolveIP(host, port);
RFuture<RedisURI> masterAddrFuture;
CompletableFuture<RedisURI> slaveAddrFuture = resolveIP(host, port);
CompletableFuture<RedisURI> masterAddrFuture;
if ("?".equals(masterHost)) {
masterAddrFuture = RedissonPromise.newSucceededFuture(null);
masterAddrFuture = CompletableFuture.completedFuture(null);
} else {
masterAddrFuture = resolveIP(masterHost, masterPort);
}
CompletableFuture<Void> resolvedFuture = CompletableFuture.allOf(masterAddrFuture.toCompletableFuture(),
slaveAddrFuture.toCompletableFuture());
CompletableFuture<Void> resolvedFuture = CompletableFuture.allOf(masterAddrFuture,
slaveAddrFuture);
futures.add(resolvedFuture
.whenComplete((r, exc) -> {
if (exc != null) {
@ -464,8 +467,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
})
.thenCompose(res -> {
RedisURI slaveAddr = slaveAddrFuture.getNow();
RedisURI masterAddr = masterAddrFuture.getNow();
RedisURI slaveAddr = slaveAddrFuture.getNow(null);
RedisURI masterAddr = masterAddrFuture.getNow(null);
if (isSlaveDown(flags, masterLinkStatus)) {
slaveDown(slaveAddr);
return CompletableFuture.completedFuture(res);
@ -552,7 +555,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
return disconnectedSlaves;
}
private CompletableFuture<Void> registerSentinel(RedisURI addr, MasterSlaveServersConfig c, String sslHostname) {
private CompletionStage<Void> registerSentinel(RedisURI addr, MasterSlaveServersConfig c, String sslHostname) {
boolean isHostname = NetUtil.createByteArrayFromIpAddressString(addr.getHost()) == null;
if (!isHostname) {
RedisClient sentinel = sentinels.get(addr);
@ -572,7 +575,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
}
CompletableFuture<RedisConnection> f = client.connectAsync();
CompletionStage<RedisConnection> f = client.connectAsync();
return f.thenApply(resp -> {
if (sentinels.putIfAbsent(ipAddr, client) == null) {
log.info("sentinel: {} added", ipAddr);
@ -582,7 +585,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
});
}
private RFuture<RedisURI> resolveIP(String host, String port) {
private CompletableFuture<RedisURI> resolveIP(String host, String port) {
RedisURI uri = toURI(host, port);
return resolveIP(uri);
}

@ -37,6 +37,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -243,8 +244,8 @@ abstract class ConnectionPool<T extends RedisConnection> {
return (T) entry.pollConnection(command);
}
protected CompletableFuture<T> connect(ClientConnectionsEntry entry) {
return (CompletableFuture<T>) entry.connect();
protected CompletionStage<T> connect(ClientConnectionsEntry entry) {
return (CompletionStage<T>) entry.connect();
}
private void connectTo(ClientConnectionsEntry entry, RPromise<T> promise, RedisCommand<?> command) {
@ -269,7 +270,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
}
private void createConnection(ClientConnectionsEntry entry, RPromise<T> promise) {
CompletableFuture<T> connFuture = connect(entry);
CompletionStage<T> connFuture = connect(entry);
connFuture.whenComplete((conn, e) -> {
if (e != null) {
promiseFailure(entry, promise, e);
@ -349,7 +350,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
}
}
CompletableFuture<RedisConnection> connectionFuture = entry.getClient().connectAsync();
CompletionStage<RedisConnection> connectionFuture = entry.getClient().connectAsync();
connectionFuture.whenComplete((c, e) -> {
synchronized (entry) {
if (entry.getFreezeReason() != FreezeReason.RECONNECT) {

@ -24,7 +24,7 @@ import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
/**
* Connection pool for Publish / Subscribe
@ -53,7 +53,7 @@ public class PubSubConnectionPool extends ConnectionPool<RedisPubSubConnection>
}
@Override
protected CompletableFuture<RedisPubSubConnection> connect(ClientConnectionsEntry entry) {
protected CompletionStage<RedisPubSubConnection> connect(ClientConnectionsEntry entry) {
return entry.connectPubSub();
}

@ -0,0 +1,371 @@
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.misc;
import org.redisson.api.RFuture;
import java.util.concurrent.*;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
/**
*
*
* @author Nikita Koksharov
* @param <V> value type
*/
public class CompletableFutureWrapper<V> implements RFuture<V> {
private final CompletableFuture<V> future;
private CompletableFuture<V> lastFuture;
public CompletableFutureWrapper(CompletableFuture<V> future) {
this.future = future;
this.lastFuture = future;
}
@Override
public <U> CompletionStage<U> thenApply(Function<? super V, ? extends U> fn) {
return future.thenApply(fn);
}
@Override
public <U> CompletionStage<U> thenApplyAsync(Function<? super V, ? extends U> fn) {
return future.thenApplyAsync(fn);
}
@Override
public <U> CompletionStage<U> thenApplyAsync(Function<? super V, ? extends U> fn, Executor executor) {
return future.thenApplyAsync(fn, executor);
}
@Override
public CompletionStage<Void> thenAccept(Consumer<? super V> action) {
return future.thenAccept(action);
}
@Override
public CompletionStage<Void> thenAcceptAsync(Consumer<? super V> action) {
return future.thenAcceptAsync(action);
}
@Override
public CompletionStage<Void> thenAcceptAsync(Consumer<? super V> action, Executor executor) {
return future.thenAcceptAsync(action, executor);
}
@Override
public CompletionStage<Void> thenRun(Runnable action) {
return future.thenRun(action);
}
@Override
public CompletionStage<Void> thenRunAsync(Runnable action) {
return future.thenRunAsync(action);
}
@Override
public CompletionStage<Void> thenRunAsync(Runnable action, Executor executor) {
return future.thenRunAsync(action, executor);
}
@Override
public <U, V1> CompletionStage<V1> thenCombine(CompletionStage<? extends U> other, BiFunction<? super V, ? super U, ? extends V1> fn) {
return future.thenCombine(other, fn);
}
@Override
public <U, V1> CompletionStage<V1> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super V, ? super U, ? extends V1> fn) {
return future.thenCombineAsync(other, fn);
}
@Override
public <U, V1> CompletionStage<V1> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super V, ? super U, ? extends V1> fn, Executor executor) {
return future.thenCombineAsync(other, fn, executor);
}
@Override
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super V, ? super U> action) {
return future.thenAcceptBoth(other, action);
}
@Override
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super V, ? super U> action) {
return future.thenAcceptBothAsync(other, action);
}
@Override
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super V, ? super U> action, Executor executor) {
return future.thenAcceptBothAsync(other, action, executor);
}
@Override
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other, Runnable action) {
return future.runAfterBoth(other, action);
}
@Override
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action) {
return future.runAfterBothAsync(other, action);
}
@Override
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor) {
return future.runAfterBothAsync(other, action, executor);
}
@Override
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends V> other, Function<? super V, U> fn) {
return future.applyToEither(other, fn);
}
@Override
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends V> other, Function<? super V, U> fn) {
return future.applyToEitherAsync(other, fn);
}
@Override
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends V> other, Function<? super V, U> fn, Executor executor) {
return future.applyToEitherAsync(other, fn, executor);
}
@Override
public CompletionStage<Void> acceptEither(CompletionStage<? extends V> other, Consumer<? super V> action) {
return future.acceptEither(other, action);
}
@Override
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends V> other, Consumer<? super V> action) {
return future.acceptEitherAsync(other, action);
}
@Override
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends V> other, Consumer<? super V> action, Executor executor) {
return future.acceptEitherAsync(other, action, executor);
}
@Override
public CompletionStage<Void> runAfterEither(CompletionStage<?> other, Runnable action) {
return future.runAfterEither(other, action);
}
@Override
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action) {
return future.runAfterEitherAsync(other, action);
}
@Override
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor) {
return future.runAfterEitherAsync(other, action, executor);
}
@Override
public <U> CompletionStage<U> thenCompose(Function<? super V, ? extends CompletionStage<U>> fn) {
return future.thenCompose(fn);
}
@Override
public <U> CompletionStage<U> thenComposeAsync(Function<? super V, ? extends CompletionStage<U>> fn) {
return future.thenComposeAsync(fn);
}
@Override
public <U> CompletionStage<U> thenComposeAsync(Function<? super V, ? extends CompletionStage<U>> fn, Executor executor) {
return future.thenComposeAsync(fn, executor);
}
@Override
public <U> CompletionStage<U> handle(BiFunction<? super V, Throwable, ? extends U> fn) {
return future.handle(fn);
}
@Override
public <U> CompletionStage<U> handleAsync(BiFunction<? super V, Throwable, ? extends U> fn) {
return future.handleAsync(fn);
}
@Override
public <U> CompletionStage<U> handleAsync(BiFunction<? super V, Throwable, ? extends U> fn, Executor executor) {
return future.handleAsync(fn, executor);
}
@Override
public CompletionStage<V> whenComplete(BiConsumer<? super V, ? super Throwable> action) {
return future.whenComplete(action);
}
@Override
public CompletionStage<V> whenCompleteAsync(BiConsumer<? super V, ? super Throwable> action) {
return future.whenCompleteAsync(action);
}
@Override
public CompletionStage<V> whenCompleteAsync(BiConsumer<? super V, ? super Throwable> action, Executor executor) {
return future.whenCompleteAsync(action, executor);
}
@Override
public CompletionStage<V> exceptionally(Function<Throwable, ? extends V> fn) {
return future.exceptionally(fn);
}
@Override
public CompletableFuture<V> toCompletableFuture() {
return future;
}
public V getNow(V valueIfAbsent) {
return future.getNow(valueIfAbsent);
}
public boolean complete(V value) {
return future.complete(value);
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return future.cancel(mayInterruptIfRunning);
}
@Override
public boolean isCancelled() {
return future.isCancelled();
}
@Override
public boolean isDone() {
return future.isDone();
}
@Override
public V get() throws InterruptedException, ExecutionException {
return future.get();
}
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return future.get(timeout, unit);
}
@Override
public boolean isSuccess() {
return future.isDone() && !future.isCompletedExceptionally();
}
@Override
public Throwable cause() {
if (future.isDone()) {
try {
future.getNow(null);
} catch (CompletionException e) {
return e.getCause();
} catch (CancellationException e) {
return e;
}
}
return null;
}
@Override
public V getNow() {
return future.getNow(null);
}
@Override
public V join() {
return future.join();
}
@Override
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
try {
future.get();
} catch (ExecutionException e) {
// skip
}
return future.isDone();
}
@Override
public boolean await(long timeoutMillis) throws InterruptedException {
return await(timeoutMillis, TimeUnit.MILLISECONDS);
}
@Override
public RFuture<V> sync() throws InterruptedException {
try {
future.get();
return this;
} catch (ExecutionException e) {
throw (RuntimeException) e.getCause();
}
}
@Override
public RFuture<V> syncUninterruptibly() {
try {
future.join();
return this;
} catch (CompletionException e) {
throw (RuntimeException) e.getCause();
}
}
@Override
public RFuture<V> await() throws InterruptedException {
try {
future.get();
} catch (ExecutionException e) {
// skip
}
return this;
}
@Override
public RFuture<V> awaitUninterruptibly() {
try {
join();
} catch (Exception e) {
// skip
}
return this;
}
@Override
public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
try {
future.get(timeout, unit);
} catch (ExecutionException | TimeoutException e) {
// skip
} catch (InterruptedException e) {
awaitUninterruptibly(timeout, unit);
}
return future.isDone();
}
@Override
public boolean awaitUninterruptibly(long timeoutMillis) {
return awaitUninterruptibly(timeoutMillis, TimeUnit.MILLISECONDS);
}
@Override
public void onComplete(BiConsumer<? super V, ? super Throwable> action) {
lastFuture = lastFuture.whenComplete(action);
}
}

@ -30,10 +30,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
/**
*
@ -116,7 +113,7 @@ public class RedissonBaseNodes implements BaseRedisNodes {
Map<RedisConnection, RFuture<String>> result = new ConcurrentHashMap<>(clients.size());
CountDownLatch latch = new CountDownLatch(clients.size());
for (RedisNode entry : clients) {
CompletableFuture<RedisConnection> f = entry.getClient().connectAsync();
CompletionStage<RedisConnection> f = entry.getClient().connectAsync();
f.whenComplete((c, e) -> {
if (c != null) {
RFuture<String> r = c.async(timeUnit.toMillis(timeout), RedisCommands.PING);

@ -34,7 +34,7 @@ import org.redisson.misc.RedissonPromise;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
/**
@ -120,7 +120,7 @@ public class SentinelRedisNode implements RedisSentinel, RedisSentinelAsync {
private <T> RFuture<T> executeAsync(T defaultValue, Codec codec, long timeout, RedisCommand<T> command, Object... params) {
RPromise<T> result = new RedissonPromise<>();
CompletableFuture<RedisConnection> connectionFuture = client.connectAsync();
CompletionStage<RedisConnection> connectionFuture = client.connectAsync();
connectionFuture.whenComplete((connection, ex) -> {
if (ex != null) {
if (defaultValue != null) {

@ -15,40 +15,14 @@
*/
package org.redisson.transaction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import io.netty.buffer.ByteBufUtil;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import org.redisson.RedissonBatch;
import org.redisson.RedissonLocalCachedMap;
import org.redisson.RedissonObject;
import org.redisson.RedissonTopic;
import org.redisson.api.BatchOptions;
import org.redisson.api.BatchResult;
import org.redisson.api.RBucket;
import org.redisson.api.RBuckets;
import org.redisson.api.RFuture;
import org.redisson.api.RLocalCachedMap;
import org.redisson.api.RMap;
import org.redisson.api.RMapCache;
import org.redisson.api.RMultimapCacheAsync;
import org.redisson.api.RSet;
import org.redisson.api.RSetCache;
import org.redisson.api.RTopic;
import org.redisson.api.RTopicAsync;
import org.redisson.api.RTransaction;
import org.redisson.api.TransactionOptions;
import org.redisson.api.*;
import org.redisson.api.listener.MessageListener;
import org.redisson.cache.LocalCachedMapDisable;
import org.redisson.cache.LocalCachedMapDisabledKey;
@ -58,15 +32,17 @@ import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.CountableListener;
import org.redisson.misc.AsyncCountDownLatch;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.transaction.operation.TransactionalOperation;
import org.redisson.transaction.operation.map.MapOperation;
import io.netty.buffer.ByteBufUtil;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
@ -464,33 +440,31 @@ public class RedissonTransaction implements RTransaction {
result.tryFailure(e);
return;
}
CountableListener<Map<HashKey, HashValue>> listener =
new CountableListener<>(result, hashes, hashes.size());
RPromise<Void> subscriptionFuture = new RedissonPromise<>();
CountableListener<Void> subscribedFutures = new CountableListener<>(subscriptionFuture, null, hashes.size());
AsyncCountDownLatch latch = new AsyncCountDownLatch();
latch.latch(() -> {
result.trySuccess(hashes);
}, hashes.size());
List<CompletableFuture<?>> subscriptionFutures = new ArrayList<>();
List<RTopic> topics = new ArrayList<>();
for (Entry<HashKey, HashValue> entry : hashes.entrySet()) {
String disabledAckName = RedissonObject.suffixName(entry.getKey().getName(), requestId + RedissonLocalCachedMap.DISABLED_ACK_SUFFIX);
RTopic topic = RedissonTopic.createRaw(LocalCachedMessageCodec.INSTANCE,
commandExecutor, disabledAckName);
topics.add(topic);
RFuture<Integer> topicFuture = topic.addListenerAsync(Object.class, new MessageListener<Object>() {
@Override
public void onMessage(CharSequence channel, Object msg) {
AtomicInteger counter = entry.getValue().getCounter();
if (counter.decrementAndGet() == 0) {
listener.decCounter();
}
RFuture<Integer> topicFuture = topic.addListenerAsync(Object.class, (channel, msg) -> {
AtomicInteger counter = entry.getValue().getCounter();
if (counter.decrementAndGet() == 0) {
latch.countDown();
}
});
topicFuture.onComplete((r, ex) -> {
subscribedFutures.decCounter();
});
subscriptionFutures.add(topicFuture.toCompletableFuture());
}
subscriptionFuture.onComplete((r, ex) -> {
CompletableFuture<Void> subscriptionFuture = CompletableFuture.allOf(subscriptionFutures.toArray(new CompletableFuture[0]));
subscriptionFuture.whenComplete((r, ex) -> {
RedissonBatch publishBatch = createBatch();
for (Entry<HashKey, HashValue> entry : hashes.entrySet()) {
String disabledKeysName = RedissonObject.suffixName(entry.getKey().getName(), RedissonLocalCachedMap.DISABLED_KEYS_SUFFIX);
@ -508,7 +482,7 @@ public class RedissonTransaction implements RTransaction {
AtomicInteger counter = entry.getValue().getCounter();
if (counter.addAndGet(receivers.intValue()) == 0) {
listener.decCounter();
latch.countDown();
}
});
}

@ -59,7 +59,7 @@ public class RedisClientTest {
@Test
public void testConnectAsync() throws InterruptedException {
CompletableFuture<RedisConnection> f = redisClient.connectAsync();
CompletionStage<RedisConnection> f = redisClient.connectAsync();
CountDownLatch l = new CountDownLatch(2);
f.whenComplete((conn, e) -> {
assertThat(conn.sync(RedisCommands.PING)).isEqualTo("PONG");

Loading…
Cancel
Save