From 39c1b70b53b4f10aba28ebbb033c7d96e7183ed7 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 15 Dec 2021 10:22:59 +0300 Subject: [PATCH] refactoring --- .../java/org/redisson/client/RedisClient.java | 4 +- .../cluster/ClusterConnectionManager.java | 56 +++--- .../MasterSlaveConnectionManager.java | 22 +-- .../ReplicatedConnectionManager.java | 14 +- .../connection/SentinelConnectionManager.java | 171 ++++++++---------- 5 files changed, 125 insertions(+), 142 deletions(-) diff --git a/redisson/src/main/java/org/redisson/client/RedisClient.java b/redisson/src/main/java/org/redisson/client/RedisClient.java index babd51dd2..3328d39a0 100644 --- a/redisson/src/main/java/org/redisson/client/RedisClient.java +++ b/redisson/src/main/java/org/redisson/client/RedisClient.java @@ -152,7 +152,7 @@ public final class RedisClient { return connectAsync().join(); } catch (CompletionException e) { if (e.getCause() instanceof RedisException) { - throw e; + throw (RedisException) e.getCause(); } else { throw new RedisConnectionException("Unable to connect to: " + uri, e); } @@ -245,7 +245,7 @@ public final class RedisClient { return connectPubSubAsync().join(); } catch (CompletionException e) { if (e.getCause() instanceof RedisException) { - throw e; + throw (RedisException) e.getCause(); } else { throw new RedisConnectionException("Unable to connect to: " + uri, e); } diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index f85eb9dbe..164a05113 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -42,10 +42,7 @@ import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; import java.util.*; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceArray; @@ -91,9 +88,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { List failedMasters = new ArrayList(); for (String address : cfg.getNodeAddresses()) { RedisURI addr = new RedisURI(address); - RFuture connectionFuture = connectToNode(cfg, addr, addr.getHost()); + CompletableFuture connectionFuture = connectToNode(cfg, addr, addr.getHost()); try { - RedisConnection connection = connectionFuture.syncUninterruptibly().getNow(); + RedisConnection connection = connectionFuture.join(); if (cfg.getNodeAddresses().size() == 1 && !addr.isIP()) { configEndpointHostName = addr.getHost(); @@ -116,7 +113,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { RFuture> partitionsFuture = parsePartitions(nodes); Collection partitions = partitionsFuture.syncUninterruptibly().getNow(); - List> masterFutures = new ArrayList<>(); + List> masterFutures = new ArrayList<>(); for (ClusterPartition partition : partitions) { if (partition.isMasterFail()) { failedMasters.add(partition.getMasterAddress().toString()); @@ -126,18 +123,21 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { throw new IllegalStateException("Master node: " + partition.getNodeId() + " doesn't have address."); } - RFuture masterFuture = addMasterEntry(partition, cfg); + CompletableFuture masterFuture = addMasterEntry(partition, cfg); masterFutures.add(masterFuture); } - for (RFuture masterFuture : masterFutures) { - masterFuture.awaitUninterruptibly(); - if (!masterFuture.isSuccess()) { - lastException = masterFuture.cause(); - } + CompletableFuture masterFuture = CompletableFuture.allOf(masterFutures.toArray(new CompletableFuture[0])); + try { + masterFuture.join(); + } catch (CompletionException e) { + lastException = e.getCause(); } break; } catch (Exception e) { + if (e instanceof CompletionException) { + e = (Exception) e.getCause(); + } lastException = e; log.warn(e.getMessage()); } @@ -262,7 +262,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { return result; } - private RFuture addMasterEntry(ClusterPartition partition, ClusterServersConfig cfg) { + private CompletableFuture addMasterEntry(ClusterPartition partition, ClusterServersConfig cfg) { + CompletableFuture result = new CompletableFuture<>(); + if (partition.isMasterFail()) { RedisException e = new RedisException("Failed to add master: " + partition.getMasterAddress() + " for slot ranges: " + @@ -272,15 +274,15 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { e = new RedisException("Failed to add master: " + partition.getMasterAddress() + ". Reason - server has FAIL flag"); } - return RedissonPromise.newFailedFuture(e); + result.completeExceptionally(e); + return result; } - RPromise result = new RedissonPromise<>(); - RFuture connectionFuture = connectToNode(cfg, partition.getMasterAddress(), configEndpointHostName); - connectionFuture.onComplete((connection, ex1) -> { + CompletableFuture connectionFuture = connectToNode(cfg, partition.getMasterAddress(), configEndpointHostName); + connectionFuture.whenComplete((connection, ex1) -> { if (ex1 != null) { log.error("Can't connect to master: {} with slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges()); - result.tryFailure(ex1); + result.completeExceptionally(ex1); return; } @@ -301,7 +303,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { f.whenComplete((masterClient, ex3) -> { if (ex3 != null) { log.error("Can't add master: " + partition.getMasterAddress() + " for slot ranges: " + partition.getSlotRanges(), ex3); - result.tryFailure(ex3); + result.completeExceptionally(ex3); return; } @@ -316,7 +318,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { if (ex != null) { log.error("unable to add slave for: " + partition.getMasterAddress() + " slot ranges: " + partition.getSlotRanges(), ex); - result.tryFailure(ex); + result.completeExceptionally(ex); return; } @@ -327,14 +329,14 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } } - if (result.trySuccess(null)) { + if (result.complete(null)) { log.info("master: {} added for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges()); } else { log.error("unable to add master: {} for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges()); } }); } else { - if (result.trySuccess(null)) { + if (result.complete(null)) { log.info("master: {} added for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges()); } else { log.error("unable to add master: {} for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges()); @@ -417,8 +419,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { return; } RedisURI uri = iterator.next(); - RFuture connectionFuture = connectToNode(cfg, uri, configEndpointHostName); - connectionFuture.onComplete((connection, e) -> { + CompletableFuture connectionFuture = connectToNode(cfg, uri, configEndpointHostName); + connectionFuture.whenComplete((connection, e) -> { if (e != null) { lastException.set(e); getShutdownLatch().release(); @@ -602,8 +604,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { RPromise result = new RedissonPromise<>(); AtomicInteger masters = new AtomicInteger(addedPartitions.size()); for (ClusterPartition newPart : addedPartitions.values()) { - RFuture future = addMasterEntry(newPart, cfg); - future.onComplete((res, e) -> { + CompletionStage future = addMasterEntry(newPart, cfg); + future.whenComplete((res, e) -> { if (masters.decrementAndGet() == 0) { result.trySuccess(null); } diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 80bce51ee..1161894a1 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -231,29 +231,23 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } } - protected final RFuture connectToNode(BaseConfig cfg, RedisURI addr, String sslHostname) { + protected final CompletableFuture connectToNode(BaseConfig cfg, RedisURI addr, String sslHostname) { return connectToNode(NodeType.MASTER, cfg, addr, sslHostname); } - protected final RFuture connectToNode(NodeType type, BaseConfig cfg, RedisURI addr, String sslHostname) { + protected final CompletableFuture connectToNode(NodeType type, BaseConfig cfg, RedisURI addr, String sslHostname) { RedisConnection conn = nodeConnections.get(addr); if (conn != null) { if (!conn.isActive()) { closeNodeConnection(conn); } else { - return RedissonPromise.newSucceededFuture(conn); + return CompletableFuture.completedFuture(conn); } } RedisClient client = createClient(type, addr, cfg.getConnectTimeout(), cfg.getTimeout(), sslHostname); - RPromise result = new RedissonPromise<>(); CompletableFuture future = client.connectAsync(); - future.whenComplete((connection, e) -> { - if (e != null) { - result.tryFailure(e); - return; - } - + return future.thenCompose(connection -> { if (connection.isActive()) { if (!addr.isIP()) { RedisURI address = new RedisURI(addr.getScheme() @@ -262,14 +256,14 @@ public class MasterSlaveConnectionManager implements ConnectionManager { nodeConnections.put(address, connection); } nodeConnections.put(addr, connection); - result.trySuccess(connection); + return CompletableFuture.completedFuture(connection); } else { connection.closeAsync(); - result.tryFailure(new RedisException("Connection to " + connection.getRedisClient().getAddr() + " is not active!")); + CompletableFuture f = new CompletableFuture<>(); + f.completeExceptionally(new RedisException("Connection to " + connection.getRedisClient().getAddr() + " is not active!")); + return f; } }); - - return result; } @Override diff --git a/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java b/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java index 6f39e7acf..9dcf132a9 100644 --- a/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java @@ -71,9 +71,13 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager { for (String address : cfg.getNodeAddresses()) { RedisURI addr = new RedisURI(address); - RFuture connectionFuture = connectToNode(cfg, addr, addr.getHost()); - connectionFuture.awaitUninterruptibly(); - RedisConnection connection = connectionFuture.getNow(); + CompletableFuture connectionFuture = connectToNode(cfg, addr, addr.getHost()); + RedisConnection connection = null; + try { + connection = connectionFuture.join(); + } catch (Exception e) { + // skip + } if (connection == null) { continue; } @@ -157,8 +161,8 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager { } private void checkNode(AsyncCountDownLatch latch, RedisURI uri, ReplicatedServersConfig cfg, Set slaveIPs) { - RFuture connectionFuture = connectToNode(cfg, uri, uri.getHost()); - connectionFuture.onComplete((connection, exc) -> { + CompletableFuture connectionFuture = connectToNode(cfg, uri, uri.getHost()); + connectionFuture.whenComplete((connection, exc) -> { if (exc != null) { log.error(exc.getMessage(), exc); latch.countDown(); diff --git a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java index 24cc6667d..f6f4cebdd 100755 --- a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -29,7 +29,6 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisStrictCommand; import org.redisson.config.*; import org.redisson.connection.ClientConnectionsEntry.FreezeReason; -import org.redisson.misc.AsyncCountDownLatch; import org.redisson.misc.RPromise; import org.redisson.misc.RedisURI; import org.redisson.misc.RedissonPromise; @@ -41,10 +40,8 @@ import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; import java.util.stream.Collectors; /** @@ -341,8 +338,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { RedisClient client = iterator.next(); RedisURI addr = toURI(client.getAddr()); - RFuture connectionFuture = connectToNode(NodeType.SENTINEL, cfg, addr, null); - connectionFuture.onComplete((connection, e) -> { + CompletableFuture connectionFuture = connectToNode(NodeType.SENTINEL, cfg, addr, null); + connectionFuture.whenComplete((connection, e) -> { if (e != null) { lastException.set(e); getShutdownLatch().release(); @@ -356,54 +353,46 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } private void updateState(SentinelServersConfig cfg, RedisConnection connection, Iterator iterator) { - AtomicInteger commands = new AtomicInteger(2); - BiConsumer commonListener = new BiConsumer() { - - private final AtomicBoolean failed = new AtomicBoolean(); - - @Override - public void accept(Object t, Throwable u) { - if (commands.decrementAndGet() == 0) { - getShutdownLatch().release(); - if (failed.get()) { - scheduleChangeCheck(cfg, iterator); - } else { - scheduleChangeCheck(cfg, null); - } - } - if (u != null && failed.compareAndSet(false, true)) { - log.error("Can't execute SENTINEL commands on " + connection.getRedisClient().getAddr(), u); - closeNodeConnection(connection); - } - } - }; - - RFuture masterFuture = checkMasterChange(cfg, connection); - masterFuture.onComplete(commonListener); + List> futures = new ArrayList<>(); + CompletionStage masterFuture = checkMasterChange(cfg, connection); + futures.add(masterFuture.toCompletableFuture()); if (!config.checkSkipSlavesInit()) { - commands.incrementAndGet(); - RFuture>> slavesFuture = checkSlavesChange(cfg, connection); - slavesFuture.onComplete(commonListener); + CompletionStage slavesFuture = checkSlavesChange(cfg, connection); + futures.add(slavesFuture.toCompletableFuture()); } - RFuture>> sentinelsFuture = checkSentinelsChange(cfg, connection); - sentinelsFuture.onComplete(commonListener); + CompletionStage sentinelsFuture = checkSentinelsChange(cfg, connection); + futures.add(sentinelsFuture.toCompletableFuture()); + + CompletableFuture future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + future.whenComplete((r, e) -> { + if (e != null) { + log.error("Can't execute SENTINEL commands on " + connection.getRedisClient().getAddr(), e); + closeNodeConnection(connection); + } + + getShutdownLatch().release(); + if (e != null) { + scheduleChangeCheck(cfg, iterator); + } else { + scheduleChangeCheck(cfg, null); + } + }); } - private RFuture>> checkSentinelsChange(SentinelServersConfig cfg, RedisConnection connection) { + private CompletionStage checkSentinelsChange(SentinelServersConfig cfg, RedisConnection connection) { if (!cfg.isSentinelsDiscovery()) { - return RedissonPromise.newSucceededFuture(null); + return CompletableFuture.completedFuture(null); } RFuture>> sentinelsFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_SENTINELS, cfg.getMasterName()); - sentinelsFuture.onComplete((list, e) -> { - if (e != null || list.isEmpty()) { - return; + return sentinelsFuture.thenCompose(list -> { + if (list.isEmpty()) { + return CompletableFuture.completedFuture(null); } - AsyncCountDownLatch latch = new AsyncCountDownLatch(); - List> newUris = list.stream().filter(m -> { + List> newUris = list.stream().filter(m -> { String flags = m.getOrDefault("flags", ""); String masterLinkStatus = m.getOrDefault("master-link-status", ""); if (!m.isEmpty() && !isSlaveDown(flags, masterLinkStatus)) { @@ -415,40 +404,39 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String port = m.get("port"); return toURI(ip, port); }).map(addr -> { - RFuture f = resolveIP(addr); - f.onComplete((res, ex) -> { - if (ex != null) { - log.error("unable to resolve hostname", ex); - } - latch.countDown(); - }); - return f; + CompletionStage f = resolveIP(addr); + return f.exceptionally(ex -> { + log.error("unable to resolve hostname", ex); + return null; + }).toCompletableFuture(); }).collect(Collectors.toList()); - latch.latch(() -> { - List uris = newUris.stream().map(u -> u.getNow()).filter(u -> u != null).collect(Collectors.toList()); + CompletableFuture futures = CompletableFuture.allOf(newUris.toArray(new CompletableFuture[0])); + return futures.whenComplete((r, ex) -> { + List uris = newUris.stream().map(u -> { + try { + return u.getNow(null); + } catch (Exception exc) { + return null; + } + }).filter(u -> u != null).collect(Collectors.toList()); + InetSocketAddress addr = connection.getRedisClient().getAddr(); RedisURI currentAddr = toURI(addr); uris.add(currentAddr); updateSentinels(uris); - }, newUris.size()); + }); }); - return sentinelsFuture; } - private RFuture>> checkSlavesChange(SentinelServersConfig cfg, RedisConnection connection) { + private CompletionStage checkSlavesChange(SentinelServersConfig cfg, RedisConnection connection) { RFuture>> slavesFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_SLAVES, cfg.getMasterName()); - slavesFuture.onComplete((slavesMap, ex) -> { - if (ex != null) { - return; - } - + return slavesFuture.thenCompose(slavesMap -> { Set currentSlaves = Collections.newSetFromMap(new ConcurrentHashMap<>(slavesMap.size())); - AsyncCountDownLatch latch = new AsyncCountDownLatch(); + List> futures = new ArrayList<>(); for (Map map : slavesMap) { if (map.isEmpty()) { - latch.countDown(); continue; } @@ -466,52 +454,48 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } else { masterAddrFuture = resolveIP(masterHost, masterPort); } + CompletableFuture resolvedFuture = CompletableFuture.allOf(masterAddrFuture.toCompletableFuture(), slaveAddrFuture.toCompletableFuture()); - resolvedFuture.whenComplete((res, exc) -> { - if (exc != null) { - log.error("Unable to resolve addresses " + host + " and/or " + masterHost, exc); - latch.countDown(); - return; - } - - RedisURI slaveAddr = slaveAddrFuture.getNow(); - RedisURI masterAddr = masterAddrFuture.getNow(); - if (isSlaveDown(flags, masterLinkStatus)) { - slaveDown(slaveAddr); - latch.countDown(); - return; - } - if ("?".equals(masterHost) || !isUseSameMaster(slaveAddr, masterAddr)) { - latch.countDown(); - return; - } - - currentSlaves.add(slaveAddr); - addSlave(slaveAddr).onComplete((r, e2) -> { - latch.countDown(); - if (e2 != null) { - log.error("Unable to add slave " + slaveAddr, e2); - } - }); - }); + futures.add(resolvedFuture + .exceptionally(exc -> { + log.error("Unable to resolve addresses " + host + " and/or " + masterHost, exc); + return null; + }) + .thenCompose(res -> { + RedisURI slaveAddr = slaveAddrFuture.getNow(); + RedisURI masterAddr = masterAddrFuture.getNow(); + if (isSlaveDown(flags, masterLinkStatus)) { + slaveDown(slaveAddr); + return CompletableFuture.completedFuture(res); + } + if ("?".equals(masterHost) || !isUseSameMaster(slaveAddr, masterAddr)) { + return CompletableFuture.completedFuture(res); + } + + currentSlaves.add(slaveAddr); + return addSlave(slaveAddr).exceptionally(e2 -> { + log.error("Unable to add slave " + slaveAddr, e2); + return null; + }); + })); } - latch.latch(() -> { + CompletableFuture future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + return future.whenComplete((r, exc) -> { MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); entry.getAllEntries().stream() .map(e -> e.getClient().getAddr()) .map(a -> toURI(a.getAddress().getHostAddress(), String.valueOf(a.getPort()))) .filter(a -> !currentSlaves.contains(a) && !a.equals(currentMaster.get())) .forEach(a -> slaveDown(a)); - }, slavesMap.size()); + }); }); - return slavesFuture; } - private RFuture checkMasterChange(SentinelServersConfig cfg, RedisConnection connection) { + private CompletionStage checkMasterChange(SentinelServersConfig cfg, RedisConnection connection) { RFuture masterFuture = connection.async(StringCodec.INSTANCE, masterHostCommand, cfg.getMasterName()); - masterFuture.thenCompose(u -> resolveIP(scheme, u)) + return masterFuture.thenCompose(u -> resolveIP(scheme, u)) .whenComplete((newMaster, e) -> { if (e != null) { return; @@ -527,7 +511,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { }); } }); - return masterFuture; } private void updateSentinels(Collection newUris) {