From 9d1643e75f5fbe7891c0fa572dc2f5b638d1ed0d Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 16 Dec 2021 11:44:08 +0300 Subject: [PATCH] refactoring --- .../main/java/org/redisson/RedisNodes.java | 7 +- .../java/org/redisson/client/RedisClient.java | 26 +- .../cluster/ClusterConnectionManager.java | 50 +-- .../redisson/command/CommandBatchService.java | 38 +- .../org/redisson/command/RedisExecutor.java | 9 +- .../connection/ClientConnectionsEntry.java | 11 +- .../connection/ConnectionManager.java | 3 +- .../MasterSlaveConnectionManager.java | 19 +- .../ReplicatedConnectionManager.java | 7 +- .../connection/SentinelConnectionManager.java | 39 +- .../connection/pool/ConnectionPool.java | 9 +- .../connection/pool/PubSubConnectionPool.java | 4 +- .../misc/CompletableFutureWrapper.java | 371 ++++++++++++++++++ .../redisson/redisnode/RedissonBaseNodes.java | 7 +- .../redisson/redisnode/SentinelRedisNode.java | 4 +- .../transaction/RedissonTransaction.java | 80 ++-- .../java/org/redisson/RedisClientTest.java | 2 +- 17 files changed, 507 insertions(+), 179 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/misc/CompletableFutureWrapper.java diff --git a/redisson/src/main/java/org/redisson/RedisNodes.java b/redisson/src/main/java/org/redisson/RedisNodes.java index 491c0bb2e..85d3edc63 100644 --- a/redisson/src/main/java/org/redisson/RedisNodes.java +++ b/redisson/src/main/java/org/redisson/RedisNodes.java @@ -31,10 +31,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * @@ -122,7 +119,7 @@ public class RedisNodes implements NodesGroup { Map> result = new ConcurrentHashMap<>(clients.size()); CountDownLatch latch = new CountDownLatch(clients.size()); for (RedisClientEntry entry : clients) { - CompletableFuture f = entry.getClient().connectAsync(); + CompletionStage f = entry.getClient().connectAsync(); f.whenComplete((c, e) -> { if (c != null) { RFuture r = c.async(timeUnit.toMillis(timeout), RedisCommands.PING); diff --git a/redisson/src/main/java/org/redisson/client/RedisClient.java b/redisson/src/main/java/org/redisson/client/RedisClient.java index 3328d39a0..04cb351e7 100644 --- a/redisson/src/main/java/org/redisson/client/RedisClient.java +++ b/redisson/src/main/java/org/redisson/client/RedisClient.java @@ -32,8 +32,10 @@ import io.netty.util.NetUtil; import io.netty.util.Timer; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; +import org.redisson.api.RFuture; import org.redisson.client.handler.RedisChannelInitializer; import org.redisson.client.handler.RedisChannelInitializer.Type; +import org.redisson.misc.CompletableFutureWrapper; import org.redisson.misc.RedisURI; import java.net.InetAddress; @@ -149,7 +151,7 @@ public final class RedisClient { public RedisConnection connect() { try { - return connectAsync().join(); + return connectAsync().toCompletableFuture().join(); } catch (CompletionException e) { if (e.getCause() instanceof RedisException) { throw (RedisException) e.getCause(); @@ -196,9 +198,9 @@ public final class RedisClient { return promise; } - public CompletableFuture connectAsync() { + public RFuture connectAsync() { CompletableFuture addrFuture = resolveAddr(); - return addrFuture.thenCompose(res -> { + CompletableFuture f = addrFuture.thenCompose(res -> { CompletableFuture r = new CompletableFuture<>(); ChannelFuture channelFuture = bootstrap.connect(res); channelFuture.addListener(new ChannelFutureListener() { @@ -238,11 +240,12 @@ public final class RedisClient { }); return r; }); + return new CompletableFutureWrapper<>(f); } public RedisPubSubConnection connectPubSub() { try { - return connectPubSubAsync().join(); + return connectPubSubAsync().toCompletableFuture().join(); } catch (CompletionException e) { if (e.getCause() instanceof RedisException) { throw (RedisException) e.getCause(); @@ -252,9 +255,9 @@ public final class RedisClient { } } - public CompletableFuture connectPubSubAsync() { + public RFuture connectPubSubAsync() { CompletableFuture nameFuture = resolveAddr(); - return nameFuture.thenCompose(res -> { + CompletableFuture f = nameFuture.thenCompose(res -> { CompletableFuture r = new CompletableFuture<>(); ChannelFuture channelFuture = pubSubBootstrap.connect(res); channelFuture.addListener(new ChannelFutureListener() { @@ -294,18 +297,19 @@ public final class RedisClient { }); return r; }); + return new CompletableFutureWrapper<>(f); } public void shutdown() { - shutdownAsync().join(); + shutdownAsync().toCompletableFuture().join(); } - public CompletableFuture shutdownAsync() { + public RFuture shutdownAsync() { shutdown = true; CompletableFuture result = new CompletableFuture<>(); if (channels.isEmpty() || config.getGroup().isShuttingDown()) { shutdown(result); - return result; + return new CompletableFutureWrapper<>(result); } ChannelGroupFuture channelsFuture = channels.newCloseFuture(); @@ -327,8 +331,8 @@ public final class RedisClient { connection.closeAsync(); } } - - return result; + + return new CompletableFutureWrapper<>(result); } public boolean isShutdown() { diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index 164a05113..7f2a59571 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -33,7 +33,6 @@ import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.ReadMode; import org.redisson.connection.*; import org.redisson.connection.ClientConnectionsEntry.FreezeReason; -import org.redisson.misc.AsyncCountDownLatch; import org.redisson.misc.RPromise; import org.redisson.misc.RedisURI; import org.redisson.misc.RedissonPromise; @@ -88,9 +87,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { List failedMasters = new ArrayList(); for (String address : cfg.getNodeAddresses()) { RedisURI addr = new RedisURI(address); - CompletableFuture connectionFuture = connectToNode(cfg, addr, addr.getHost()); + CompletionStage connectionFuture = connectToNode(cfg, addr, addr.getHost()); try { - RedisConnection connection = connectionFuture.join(); + RedisConnection connection = connectionFuture.toCompletableFuture().join(); if (cfg.getNodeAddresses().size() == 1 && !addr.isIP()) { configEndpointHostName = addr.getHost(); @@ -111,8 +110,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { lastClusterNode = addr; - RFuture> partitionsFuture = parsePartitions(nodes); - Collection partitions = partitionsFuture.syncUninterruptibly().getNow(); + CompletableFuture> partitionsFuture = parsePartitions(nodes); + Collection partitions = partitionsFuture.join(); List> masterFutures = new ArrayList<>(); for (ClusterPartition partition : partitions) { if (partition.isMasterFail()) { @@ -278,7 +277,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { return result; } - CompletableFuture connectionFuture = connectToNode(cfg, partition.getMasterAddress(), configEndpointHostName); + CompletionStage connectionFuture = connectToNode(cfg, partition.getMasterAddress(), configEndpointHostName); connectionFuture.whenComplete((connection, ex1) -> { if (ex1 != null) { log.error("Can't connect to master: {} with slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges()); @@ -419,7 +418,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { return; } RedisURI uri = iterator.next(); - CompletableFuture connectionFuture = connectToNode(cfg, uri, configEndpointHostName); + CompletionStage connectionFuture = connectToNode(cfg, uri, configEndpointHostName); connectionFuture.whenComplete((connection, e) -> { if (e != null) { lastException.set(e); @@ -461,8 +460,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { log.debug("cluster nodes state got from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue); } - RFuture> newPartitionsFuture = parsePartitions(nodes); - newPartitionsFuture.onComplete((newPartitions, ex) -> { + CompletableFuture> newPartitionsFuture = parsePartitions(nodes); + newPartitionsFuture.whenComplete((newPartitions, ex) -> { RFuture masterFuture = checkMasterNodesChange(cfg, newPartitions); checkSlaveNodesChange(newPartitions); masterFuture.onComplete((res, exc) -> { @@ -762,10 +761,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { return natMapper.map(address); } - private RFuture> parsePartitions(List nodes) { + private CompletableFuture> parsePartitions(List nodes) { Map partitions = new ConcurrentHashMap<>(); - AsyncCountDownLatch latch = new AsyncCountDownLatch(); - int counter = 0; + List> futures = new ArrayList<>(); for (ClusterNodeInfo clusterNodeInfo : nodes) { if (clusterNodeInfo.containsFlag(Flag.NOADDR) || clusterNodeInfo.containsFlag(Flag.HANDSHAKE) @@ -787,13 +785,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { continue; } - RFuture ipFuture = resolveIP(clusterNodeInfo.getAddress()); - counter++; - ipFuture.onComplete((address, e) -> { - if (e != null) { - latch.countDown(); - return; - } + CompletableFuture ipFuture = resolveIP(clusterNodeInfo.getAddress()); + CompletableFuture f = ipFuture.thenAccept(address -> { if (clusterNodeInfo.containsFlag(Flag.SLAVE)) { ClusterPartition masterPartition = partitions.computeIfAbsent(masterId, k -> new ClusterPartition(masterId)); @@ -815,22 +808,21 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { masterPartition.setMasterFail(true); } } - latch.countDown(); }); + futures.add(f); } - RPromise> result = new RedissonPromise<>(); - latch.latch(() -> { + CompletableFuture future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + return future.handle((r, e) -> { addCascadeSlaves(partitions.values()); List ps = partitions.values() - .stream() - .filter(cp -> cp.getType() == Type.MASTER - && cp.getMasterAddress() != null) - .collect(Collectors.toList()); - result.trySuccess(ps); - }, counter); - return result; + .stream() + .filter(cp -> cp.getType() == Type.MASTER + && cp.getMasterAddress() != null) + .collect(Collectors.toList()); + return ps; + }); } private void addCascadeSlaves(Collection partitions) { diff --git a/redisson/src/main/java/org/redisson/command/CommandBatchService.java b/redisson/src/main/java/org/redisson/command/CommandBatchService.java index d60323320..1e383b61b 100644 --- a/redisson/src/main/java/org/redisson/command/CommandBatchService.java +++ b/redisson/src/main/java/org/redisson/command/CommandBatchService.java @@ -34,15 +34,11 @@ import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.NodeSource; import org.redisson.liveobject.core.RedissonObjectBuilder; import org.redisson.misc.AsyncCountDownLatch; -import org.redisson.misc.CountableListener; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -408,38 +404,28 @@ public class CommandBatchService extends CommandAsyncService { return; } - RPromise>> mainPromise = new RedissonPromise<>(); Map> result = new ConcurrentHashMap<>(); - CountableListener>> listener = new CountableListener<>(mainPromise, result); - listener.setCounter(connections.size()); + List> futures = new ArrayList<>(commands.size()); for (Map.Entry entry : commands.entrySet()) { RPromise> execPromise = new RedissonPromise<>(); + async(entry.getValue().isReadOnlyMode(), new NodeSource(entry.getKey()), connectionManager.getCodec(), RedisCommands.EXEC, new Object[] {}, execPromise, false, false); - execPromise.onComplete((r, ex) -> { - if (ex != null) { - mainPromise.tryFailure(ex); - return; - } + CompletionStage f = execPromise.thenCompose(r -> { BatchCommandData lastCommand = (BatchCommandData) entry.getValue().getCommands().peekLast(); result.put(entry.getKey(), r); + if (RedisCommands.WAIT.getName().equals(lastCommand.getCommand().getName())) { - lastCommand.getPromise().onComplete((res, e) -> { - if (e != null) { - mainPromise.tryFailure(e); - return; - } - - execPromise.onComplete(listener); - }); - } else { - execPromise.onComplete(listener); + return lastCommand.getPromise().thenApply(i -> null); } + return CompletableFuture.completedFuture(null); }); + futures.add(f.toCompletableFuture()); } - - mainPromise.onComplete((res, ex) -> { + + CompletableFuture future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + future.whenComplete((res, ex) -> { executed.set(true); if (ex != null) { resultPromise.tryFailure(ex); @@ -447,7 +433,7 @@ public class CommandBatchService extends CommandAsyncService { } try { - for (java.util.Map.Entry> entry : res.entrySet()) { + for (java.util.Map.Entry> entry : result.entrySet()) { Entry commandEntry = commands.get(entry.getKey()); Iterator resultIter = entry.getValue().iterator(); for (BatchCommandData data : commandEntry.getCommands()) { diff --git a/redisson/src/main/java/org/redisson/command/RedisExecutor.java b/redisson/src/main/java/org/redisson/command/RedisExecutor.java index 4c15624d5..329e007d2 100644 --- a/redisson/src/main/java/org/redisson/command/RedisExecutor.java +++ b/redisson/src/main/java/org/redisson/command/RedisExecutor.java @@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; @@ -416,8 +417,8 @@ public class RedisExecutor { onException(); - RFuture ipAddrFuture = connectionManager.resolveIP(ex.getUrl()); - ipAddrFuture.onComplete((ip, e) -> { + CompletableFuture ipAddrFuture = connectionManager.resolveIP(ex.getUrl()); + ipAddrFuture.whenComplete((ip, e) -> { if (e != null) { handleError(connectionFuture, e); return; @@ -433,8 +434,8 @@ public class RedisExecutor { onException(); - RFuture ipAddrFuture = connectionManager.resolveIP(ex.getUrl()); - ipAddrFuture.onComplete((ip, e) -> { + CompletableFuture ipAddrFuture = connectionManager.resolveIP(ex.getUrl()); + ipAddrFuture.whenComplete((ip, e) -> { if (e != null) { handleError(connectionFuture, e); return; diff --git a/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java b/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java index 85383dd45..b63f3b97c 100644 --- a/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java +++ b/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java @@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory; import java.util.Deque; import java.util.Queue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicLong; @@ -120,7 +121,7 @@ public class ClientConnectionsEntry { public CompletableFuture shutdownAsync() { connectionManager.getConnectionWatcher().remove(this); - return client.shutdownAsync(); + return client.shutdownAsync().toCompletableFuture(); } public RedisClient getClient() { @@ -187,8 +188,8 @@ public class ClientConnectionsEntry { freeConnections.add(connection); } - public CompletableFuture connect() { - CompletableFuture future = client.connectAsync(); + public CompletionStage connect() { + CompletionStage future = client.connectAsync(); return future.whenComplete((conn, e) -> { if (e != null) { return; @@ -222,8 +223,8 @@ public class ClientConnectionsEntry { connectionManager.getConnectionEventsHub().fireConnect(conn.getRedisClient().getAddr()); } - public CompletableFuture connectPubSub() { - CompletableFuture future = client.connectPubSubAsync(); + public CompletionStage connectPubSub() { + CompletionStage future = client.connectPubSubAsync(); return future.whenComplete((conn, e) -> { if (e != null) { return; diff --git a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java index c230a9c76..cb78e5c9b 100644 --- a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java @@ -35,6 +35,7 @@ import org.redisson.pubsub.PublishSubscribeService; import java.net.InetSocketAddress; import java.util.Collection; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -47,7 +48,7 @@ public interface ConnectionManager { RedisURI applyNatMap(RedisURI address); - RFuture resolveIP(RedisURI address); + CompletableFuture resolveIP(RedisURI address); String getId(); diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 1161894a1..3f1719dae 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -47,7 +47,6 @@ import org.redisson.client.protocol.RedisCommand; import org.redisson.cluster.ClusterSlotRange; import org.redisson.config.*; import org.redisson.misc.InfinitySemaphoreLatch; -import org.redisson.misc.RPromise; import org.redisson.misc.RedisURI; import org.redisson.misc.RedissonPromise; import org.redisson.pubsub.PublishSubscribeService; @@ -231,11 +230,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } } - protected final CompletableFuture connectToNode(BaseConfig cfg, RedisURI addr, String sslHostname) { + protected final CompletionStage connectToNode(BaseConfig cfg, RedisURI addr, String sslHostname) { return connectToNode(NodeType.MASTER, cfg, addr, sslHostname); } - protected final CompletableFuture connectToNode(NodeType type, BaseConfig cfg, RedisURI addr, String sslHostname) { + protected final CompletionStage connectToNode(NodeType type, BaseConfig cfg, RedisURI addr, String sslHostname) { RedisConnection conn = nodeConnections.get(addr); if (conn != null) { if (!conn.isActive()) { @@ -246,7 +245,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } RedisClient client = createClient(type, addr, cfg.getConnectTimeout(), cfg.getTimeout(), sslHostname); - CompletableFuture future = client.connectAsync(); + CompletionStage future = client.connectAsync(); return future.thenCompose(connection -> { if (connection.isActive()) { if (!addr.isIP()) { @@ -697,31 +696,31 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } @Override - public RFuture resolveIP(RedisURI address) { + public CompletableFuture resolveIP(RedisURI address) { return resolveIP(address.getScheme(), address); } - protected RFuture resolveIP(String scheme, RedisURI address) { + protected CompletableFuture resolveIP(String scheme, RedisURI address) { if (address.isIP()) { RedisURI addr = applyNatMap(address); - return RedissonPromise.newSucceededFuture(addr); + return CompletableFuture.completedFuture(addr); } - RPromise result = new RedissonPromise<>(); + CompletableFuture result = new CompletableFuture<>(); AddressResolver resolver = resolverGroup.getResolver(getGroup().next()); InetSocketAddress addr = InetSocketAddress.createUnresolved(address.getHost(), address.getPort()); Future future = resolver.resolve(addr); future.addListener((FutureListener) f -> { if (!f.isSuccess()) { log.error("Unable to resolve " + address, f.cause()); - result.tryFailure(f.cause()); + result.completeExceptionally(f.cause()); return; } InetSocketAddress s = f.getNow(); RedisURI uri = new RedisURI(scheme + "://" + s.getAddress().getHostAddress() + ":" + address.getPort()); uri = applyNatMap(uri); - result.trySuccess(uri); + result.complete(uri); }); return result; } diff --git a/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java b/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java index 9dcf132a9..ab9995142 100644 --- a/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java @@ -35,6 +35,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -71,10 +72,10 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager { for (String address : cfg.getNodeAddresses()) { RedisURI addr = new RedisURI(address); - CompletableFuture connectionFuture = connectToNode(cfg, addr, addr.getHost()); + CompletionStage connectionFuture = connectToNode(cfg, addr, addr.getHost()); RedisConnection connection = null; try { - connection = connectionFuture.join(); + connection = connectionFuture.toCompletableFuture().join(); } catch (Exception e) { // skip } @@ -161,7 +162,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager { } private void checkNode(AsyncCountDownLatch latch, RedisURI uri, ReplicatedServersConfig cfg, Set slaveIPs) { - CompletableFuture connectionFuture = connectToNode(cfg, uri, uri.getHost()); + CompletionStage connectionFuture = connectToNode(cfg, uri, uri.getHost()); connectionFuture.whenComplete((connection, exc) -> { if (exc != null) { log.error(exc.getMessage(), exc); diff --git a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java index 89d36897a..8d73dac1f 100755 --- a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -122,7 +122,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { throw new RedisConnectionException("Master node is undefined! SENTINEL GET-MASTER-ADDR-BY-NAME command returns empty result!"); } - RedisURI masterHost = resolveIP(scheme, master).syncUninterruptibly().getNow(); + RedisURI masterHost = resolveIP(scheme, master).join(); this.config.setMasterAddress(masterHost.toString()); currentMaster.set(masterHost); log.info("master: {} added", masterHost); @@ -138,7 +138,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String flags = map.getOrDefault("flags", ""); String masterLinkStatus = map.getOrDefault("master-link-status", ""); - RedisURI uri = resolveIP(host, port).syncUninterruptibly().getNow(); + RedisURI uri = resolveIP(host, port).join(); this.config.addSlaveAddress(uri.toString()); log.debug("slave {} state: {}", uri, map); @@ -160,14 +160,14 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String ip = map.get("ip"); String port = map.get("port"); - RedisURI uri = resolveIP(ip, port).syncUninterruptibly().getNow(); - CompletableFuture future = registerSentinel(uri, this.config, null); - connectionFutures.add(future); + RedisURI uri = resolveIP(ip, port).join(); + CompletionStage future = registerSentinel(uri, this.config, null); + connectionFutures.add(future.toCompletableFuture()); } RedisURI sentinelIp = toURI(connection.getRedisClient().getAddr()); - CompletableFuture f = registerSentinel(sentinelIp, this.config, null); - connectionFutures.add(f); + CompletionStage f = registerSentinel(sentinelIp, this.config, null); + connectionFutures.add(f.toCompletableFuture()); CompletableFuture future = CompletableFuture.allOf(connectionFutures.toArray(new CompletableFuture[0])); try { @@ -181,6 +181,9 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { stopThreads(); throw e; } catch (Exception e) { + if (e instanceof CompletionException) { + e = (Exception) e.getCause(); + } lastException = e; log.warn(e.getMessage()); } finally { @@ -338,7 +341,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { RedisClient client = iterator.next(); RedisURI addr = toURI(client.getAddr()); - CompletableFuture connectionFuture = connectToNode(NodeType.SENTINEL, cfg, addr, null); + CompletionStage connectionFuture = connectToNode(NodeType.SENTINEL, cfg, addr, null); connectionFuture.whenComplete((connection, e) -> { if (e != null) { lastException.set(e); @@ -447,16 +450,16 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String masterHost = map.get("master-host"); String masterPort = map.get("master-port"); - RFuture slaveAddrFuture = resolveIP(host, port); - RFuture masterAddrFuture; + CompletableFuture slaveAddrFuture = resolveIP(host, port); + CompletableFuture masterAddrFuture; if ("?".equals(masterHost)) { - masterAddrFuture = RedissonPromise.newSucceededFuture(null); + masterAddrFuture = CompletableFuture.completedFuture(null); } else { masterAddrFuture = resolveIP(masterHost, masterPort); } - CompletableFuture resolvedFuture = CompletableFuture.allOf(masterAddrFuture.toCompletableFuture(), - slaveAddrFuture.toCompletableFuture()); + CompletableFuture resolvedFuture = CompletableFuture.allOf(masterAddrFuture, + slaveAddrFuture); futures.add(resolvedFuture .whenComplete((r, exc) -> { if (exc != null) { @@ -464,8 +467,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } }) .thenCompose(res -> { - RedisURI slaveAddr = slaveAddrFuture.getNow(); - RedisURI masterAddr = masterAddrFuture.getNow(); + RedisURI slaveAddr = slaveAddrFuture.getNow(null); + RedisURI masterAddr = masterAddrFuture.getNow(null); if (isSlaveDown(flags, masterLinkStatus)) { slaveDown(slaveAddr); return CompletableFuture.completedFuture(res); @@ -552,7 +555,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { return disconnectedSlaves; } - private CompletableFuture registerSentinel(RedisURI addr, MasterSlaveServersConfig c, String sslHostname) { + private CompletionStage registerSentinel(RedisURI addr, MasterSlaveServersConfig c, String sslHostname) { boolean isHostname = NetUtil.createByteArrayFromIpAddressString(addr.getHost()) == null; if (!isHostname) { RedisClient sentinel = sentinels.get(addr); @@ -572,7 +575,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } } - CompletableFuture f = client.connectAsync(); + CompletionStage f = client.connectAsync(); return f.thenApply(resp -> { if (sentinels.putIfAbsent(ipAddr, client) == null) { log.info("sentinel: {} added", ipAddr); @@ -582,7 +585,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { }); } - private RFuture resolveIP(String host, String port) { + private CompletableFuture resolveIP(String host, String port) { RedisURI uri = toURI(host, port); return resolveIP(uri); } diff --git a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java index 01ccd7086..069a3c880 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java @@ -37,6 +37,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Queue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -243,8 +244,8 @@ abstract class ConnectionPool { return (T) entry.pollConnection(command); } - protected CompletableFuture connect(ClientConnectionsEntry entry) { - return (CompletableFuture) entry.connect(); + protected CompletionStage connect(ClientConnectionsEntry entry) { + return (CompletionStage) entry.connect(); } private void connectTo(ClientConnectionsEntry entry, RPromise promise, RedisCommand command) { @@ -269,7 +270,7 @@ abstract class ConnectionPool { } private void createConnection(ClientConnectionsEntry entry, RPromise promise) { - CompletableFuture connFuture = connect(entry); + CompletionStage connFuture = connect(entry); connFuture.whenComplete((conn, e) -> { if (e != null) { promiseFailure(entry, promise, e); @@ -349,7 +350,7 @@ abstract class ConnectionPool { } } - CompletableFuture connectionFuture = entry.getClient().connectAsync(); + CompletionStage connectionFuture = entry.getClient().connectAsync(); connectionFuture.whenComplete((c, e) -> { synchronized (entry) { if (entry.getFreezeReason() != FreezeReason.RECONNECT) { diff --git a/redisson/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java index 9f47b5aa5..9e1c23a1a 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java @@ -24,7 +24,7 @@ import org.redisson.connection.ClientConnectionsEntry; import org.redisson.connection.ConnectionManager; import org.redisson.connection.MasterSlaveEntry; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; /** * Connection pool for Publish / Subscribe @@ -53,7 +53,7 @@ public class PubSubConnectionPool extends ConnectionPool } @Override - protected CompletableFuture connect(ClientConnectionsEntry entry) { + protected CompletionStage connect(ClientConnectionsEntry entry) { return entry.connectPubSub(); } diff --git a/redisson/src/main/java/org/redisson/misc/CompletableFutureWrapper.java b/redisson/src/main/java/org/redisson/misc/CompletableFutureWrapper.java new file mode 100644 index 000000000..b9fbf49da --- /dev/null +++ b/redisson/src/main/java/org/redisson/misc/CompletableFutureWrapper.java @@ -0,0 +1,371 @@ +/** + * Copyright (c) 2013-2021 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.misc; + +import org.redisson.api.RFuture; + +import java.util.concurrent.*; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * + * + * @author Nikita Koksharov + * @param value type + */ +public class CompletableFutureWrapper implements RFuture { + + private final CompletableFuture future; + private CompletableFuture lastFuture; + + public CompletableFutureWrapper(CompletableFuture future) { + this.future = future; + this.lastFuture = future; + } + + @Override + public CompletionStage thenApply(Function fn) { + return future.thenApply(fn); + } + + @Override + public CompletionStage thenApplyAsync(Function fn) { + return future.thenApplyAsync(fn); + } + + @Override + public CompletionStage thenApplyAsync(Function fn, Executor executor) { + return future.thenApplyAsync(fn, executor); + } + + @Override + public CompletionStage thenAccept(Consumer action) { + return future.thenAccept(action); + } + + @Override + public CompletionStage thenAcceptAsync(Consumer action) { + return future.thenAcceptAsync(action); + } + + @Override + public CompletionStage thenAcceptAsync(Consumer action, Executor executor) { + return future.thenAcceptAsync(action, executor); + } + + @Override + public CompletionStage thenRun(Runnable action) { + return future.thenRun(action); + } + + @Override + public CompletionStage thenRunAsync(Runnable action) { + return future.thenRunAsync(action); + } + + @Override + public CompletionStage thenRunAsync(Runnable action, Executor executor) { + return future.thenRunAsync(action, executor); + } + + @Override + public CompletionStage thenCombine(CompletionStage other, BiFunction fn) { + return future.thenCombine(other, fn); + } + + @Override + public CompletionStage thenCombineAsync(CompletionStage other, BiFunction fn) { + return future.thenCombineAsync(other, fn); + } + + @Override + public CompletionStage thenCombineAsync(CompletionStage other, BiFunction fn, Executor executor) { + return future.thenCombineAsync(other, fn, executor); + } + + @Override + public CompletionStage thenAcceptBoth(CompletionStage other, BiConsumer action) { + return future.thenAcceptBoth(other, action); + } + + @Override + public CompletionStage thenAcceptBothAsync(CompletionStage other, BiConsumer action) { + return future.thenAcceptBothAsync(other, action); + } + + @Override + public CompletionStage thenAcceptBothAsync(CompletionStage other, BiConsumer action, Executor executor) { + return future.thenAcceptBothAsync(other, action, executor); + } + + @Override + public CompletionStage runAfterBoth(CompletionStage other, Runnable action) { + return future.runAfterBoth(other, action); + } + + @Override + public CompletionStage runAfterBothAsync(CompletionStage other, Runnable action) { + return future.runAfterBothAsync(other, action); + } + + @Override + public CompletionStage runAfterBothAsync(CompletionStage other, Runnable action, Executor executor) { + return future.runAfterBothAsync(other, action, executor); + } + + @Override + public CompletionStage applyToEither(CompletionStage other, Function fn) { + return future.applyToEither(other, fn); + } + + @Override + public CompletionStage applyToEitherAsync(CompletionStage other, Function fn) { + return future.applyToEitherAsync(other, fn); + } + + @Override + public CompletionStage applyToEitherAsync(CompletionStage other, Function fn, Executor executor) { + return future.applyToEitherAsync(other, fn, executor); + } + + @Override + public CompletionStage acceptEither(CompletionStage other, Consumer action) { + return future.acceptEither(other, action); + } + + @Override + public CompletionStage acceptEitherAsync(CompletionStage other, Consumer action) { + return future.acceptEitherAsync(other, action); + } + + @Override + public CompletionStage acceptEitherAsync(CompletionStage other, Consumer action, Executor executor) { + return future.acceptEitherAsync(other, action, executor); + } + + @Override + public CompletionStage runAfterEither(CompletionStage other, Runnable action) { + return future.runAfterEither(other, action); + } + + @Override + public CompletionStage runAfterEitherAsync(CompletionStage other, Runnable action) { + return future.runAfterEitherAsync(other, action); + } + + @Override + public CompletionStage runAfterEitherAsync(CompletionStage other, Runnable action, Executor executor) { + return future.runAfterEitherAsync(other, action, executor); + } + + @Override + public CompletionStage thenCompose(Function> fn) { + return future.thenCompose(fn); + } + + @Override + public CompletionStage thenComposeAsync(Function> fn) { + return future.thenComposeAsync(fn); + } + + @Override + public CompletionStage thenComposeAsync(Function> fn, Executor executor) { + return future.thenComposeAsync(fn, executor); + } + + @Override + public CompletionStage handle(BiFunction fn) { + return future.handle(fn); + } + + @Override + public CompletionStage handleAsync(BiFunction fn) { + return future.handleAsync(fn); + } + + @Override + public CompletionStage handleAsync(BiFunction fn, Executor executor) { + return future.handleAsync(fn, executor); + } + + @Override + public CompletionStage whenComplete(BiConsumer action) { + return future.whenComplete(action); + } + + @Override + public CompletionStage whenCompleteAsync(BiConsumer action) { + return future.whenCompleteAsync(action); + } + + @Override + public CompletionStage whenCompleteAsync(BiConsumer action, Executor executor) { + return future.whenCompleteAsync(action, executor); + } + + @Override + public CompletionStage exceptionally(Function fn) { + return future.exceptionally(fn); + } + + @Override + public CompletableFuture toCompletableFuture() { + return future; + } + + public V getNow(V valueIfAbsent) { + return future.getNow(valueIfAbsent); + } + + public boolean complete(V value) { + return future.complete(value); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return future.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return future.isCancelled(); + } + + @Override + public boolean isDone() { + return future.isDone(); + } + + @Override + public V get() throws InterruptedException, ExecutionException { + return future.get(); + } + + @Override + public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return future.get(timeout, unit); + } + + @Override + public boolean isSuccess() { + return future.isDone() && !future.isCompletedExceptionally(); + } + + @Override + public Throwable cause() { + if (future.isDone()) { + try { + future.getNow(null); + } catch (CompletionException e) { + return e.getCause(); + } catch (CancellationException e) { + return e; + } + } + return null; + } + + @Override + public V getNow() { + return future.getNow(null); + } + + @Override + public V join() { + return future.join(); + } + + @Override + public boolean await(long timeout, TimeUnit unit) throws InterruptedException { + try { + future.get(); + } catch (ExecutionException e) { + // skip + } + return future.isDone(); + } + + @Override + public boolean await(long timeoutMillis) throws InterruptedException { + return await(timeoutMillis, TimeUnit.MILLISECONDS); + } + + @Override + public RFuture sync() throws InterruptedException { + try { + future.get(); + return this; + } catch (ExecutionException e) { + throw (RuntimeException) e.getCause(); + } + } + + @Override + public RFuture syncUninterruptibly() { + try { + future.join(); + return this; + } catch (CompletionException e) { + throw (RuntimeException) e.getCause(); + } + } + + @Override + public RFuture await() throws InterruptedException { + try { + future.get(); + } catch (ExecutionException e) { + // skip + } + return this; + } + + @Override + public RFuture awaitUninterruptibly() { + try { + join(); + } catch (Exception e) { + // skip + } + return this; + } + + @Override + public boolean awaitUninterruptibly(long timeout, TimeUnit unit) { + try { + future.get(timeout, unit); + } catch (ExecutionException | TimeoutException e) { + // skip + } catch (InterruptedException e) { + awaitUninterruptibly(timeout, unit); + } + return future.isDone(); + } + + @Override + public boolean awaitUninterruptibly(long timeoutMillis) { + return awaitUninterruptibly(timeoutMillis, TimeUnit.MILLISECONDS); + } + + @Override + public void onComplete(BiConsumer action) { + lastFuture = lastFuture.whenComplete(action); + } + +} diff --git a/redisson/src/main/java/org/redisson/redisnode/RedissonBaseNodes.java b/redisson/src/main/java/org/redisson/redisnode/RedissonBaseNodes.java index 6e90f9860..3e8b7db5a 100644 --- a/redisson/src/main/java/org/redisson/redisnode/RedissonBaseNodes.java +++ b/redisson/src/main/java/org/redisson/redisnode/RedissonBaseNodes.java @@ -30,10 +30,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * @@ -116,7 +113,7 @@ public class RedissonBaseNodes implements BaseRedisNodes { Map> result = new ConcurrentHashMap<>(clients.size()); CountDownLatch latch = new CountDownLatch(clients.size()); for (RedisNode entry : clients) { - CompletableFuture f = entry.getClient().connectAsync(); + CompletionStage f = entry.getClient().connectAsync(); f.whenComplete((c, e) -> { if (c != null) { RFuture r = c.async(timeUnit.toMillis(timeout), RedisCommands.PING); diff --git a/redisson/src/main/java/org/redisson/redisnode/SentinelRedisNode.java b/redisson/src/main/java/org/redisson/redisnode/SentinelRedisNode.java index 24233e9b5..198a53349 100644 --- a/redisson/src/main/java/org/redisson/redisnode/SentinelRedisNode.java +++ b/redisson/src/main/java/org/redisson/redisnode/SentinelRedisNode.java @@ -34,7 +34,7 @@ import org.redisson.misc.RedissonPromise; import java.net.InetSocketAddress; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; /** @@ -120,7 +120,7 @@ public class SentinelRedisNode implements RedisSentinel, RedisSentinelAsync { private RFuture executeAsync(T defaultValue, Codec codec, long timeout, RedisCommand command, Object... params) { RPromise result = new RedissonPromise<>(); - CompletableFuture connectionFuture = client.connectAsync(); + CompletionStage connectionFuture = client.connectAsync(); connectionFuture.whenComplete((connection, ex) -> { if (ex != null) { if (defaultValue != null) { diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java index bf4ec5477..2cd62307e 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java @@ -15,40 +15,14 @@ */ package org.redisson.transaction; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - +import io.netty.buffer.ByteBufUtil; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; import org.redisson.RedissonBatch; import org.redisson.RedissonLocalCachedMap; import org.redisson.RedissonObject; import org.redisson.RedissonTopic; -import org.redisson.api.BatchOptions; -import org.redisson.api.BatchResult; -import org.redisson.api.RBucket; -import org.redisson.api.RBuckets; -import org.redisson.api.RFuture; -import org.redisson.api.RLocalCachedMap; -import org.redisson.api.RMap; -import org.redisson.api.RMapCache; -import org.redisson.api.RMultimapCacheAsync; -import org.redisson.api.RSet; -import org.redisson.api.RSetCache; -import org.redisson.api.RTopic; -import org.redisson.api.RTopicAsync; -import org.redisson.api.RTransaction; -import org.redisson.api.TransactionOptions; +import org.redisson.api.*; import org.redisson.api.listener.MessageListener; import org.redisson.cache.LocalCachedMapDisable; import org.redisson.cache.LocalCachedMapDisabledKey; @@ -58,15 +32,17 @@ import org.redisson.client.codec.Codec; import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandBatchService; import org.redisson.connection.MasterSlaveEntry; -import org.redisson.misc.CountableListener; +import org.redisson.misc.AsyncCountDownLatch; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; import org.redisson.transaction.operation.TransactionalOperation; import org.redisson.transaction.operation.map.MapOperation; -import io.netty.buffer.ByteBufUtil; -import io.netty.util.Timeout; -import io.netty.util.TimerTask; +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; /** * @@ -464,33 +440,31 @@ public class RedissonTransaction implements RTransaction { result.tryFailure(e); return; } - - CountableListener> listener = - new CountableListener<>(result, hashes, hashes.size()); - RPromise subscriptionFuture = new RedissonPromise<>(); - CountableListener subscribedFutures = new CountableListener<>(subscriptionFuture, null, hashes.size()); - + + AsyncCountDownLatch latch = new AsyncCountDownLatch(); + latch.latch(() -> { + result.trySuccess(hashes); + }, hashes.size()); + + List> subscriptionFutures = new ArrayList<>(); + List topics = new ArrayList<>(); for (Entry entry : hashes.entrySet()) { String disabledAckName = RedissonObject.suffixName(entry.getKey().getName(), requestId + RedissonLocalCachedMap.DISABLED_ACK_SUFFIX); RTopic topic = RedissonTopic.createRaw(LocalCachedMessageCodec.INSTANCE, commandExecutor, disabledAckName); topics.add(topic); - RFuture topicFuture = topic.addListenerAsync(Object.class, new MessageListener() { - @Override - public void onMessage(CharSequence channel, Object msg) { - AtomicInteger counter = entry.getValue().getCounter(); - if (counter.decrementAndGet() == 0) { - listener.decCounter(); - } + RFuture topicFuture = topic.addListenerAsync(Object.class, (channel, msg) -> { + AtomicInteger counter = entry.getValue().getCounter(); + if (counter.decrementAndGet() == 0) { + latch.countDown(); } }); - topicFuture.onComplete((r, ex) -> { - subscribedFutures.decCounter(); - }); + subscriptionFutures.add(topicFuture.toCompletableFuture()); } - - subscriptionFuture.onComplete((r, ex) -> { + + CompletableFuture subscriptionFuture = CompletableFuture.allOf(subscriptionFutures.toArray(new CompletableFuture[0])); + subscriptionFuture.whenComplete((r, ex) -> { RedissonBatch publishBatch = createBatch(); for (Entry entry : hashes.entrySet()) { String disabledKeysName = RedissonObject.suffixName(entry.getKey().getName(), RedissonLocalCachedMap.DISABLED_KEYS_SUFFIX); @@ -508,7 +482,7 @@ public class RedissonTransaction implements RTransaction { AtomicInteger counter = entry.getValue().getCounter(); if (counter.addAndGet(receivers.intValue()) == 0) { - listener.decCounter(); + latch.countDown(); } }); } diff --git a/redisson/src/test/java/org/redisson/RedisClientTest.java b/redisson/src/test/java/org/redisson/RedisClientTest.java index 23c08b5fe..923713d4a 100644 --- a/redisson/src/test/java/org/redisson/RedisClientTest.java +++ b/redisson/src/test/java/org/redisson/RedisClientTest.java @@ -59,7 +59,7 @@ public class RedisClientTest { @Test public void testConnectAsync() throws InterruptedException { - CompletableFuture f = redisClient.connectAsync(); + CompletionStage f = redisClient.connectAsync(); CountDownLatch l = new CountDownLatch(2); f.whenComplete((conn, e) -> { assertThat(conn.sync(RedisCommands.PING)).isEqualTo("PONG");