From 8a3943906a7a7c46c0a52788912e4168d6060be9 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 15 Dec 2021 15:55:34 +0300 Subject: [PATCH] refactoring --- .../redisson/connection/MasterSlaveEntry.java | 15 +++-- .../connection/SentinelConnectionManager.java | 14 ++-- .../balancer/LoadBalancerManager.java | 67 ++++++++----------- .../connection/pool/ConnectionPool.java | 35 +++++----- 4 files changed, 58 insertions(+), 73 deletions(-) diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 4c346f401..f787e6d4b 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -137,12 +137,12 @@ public class MasterSlaveEntry { futures.add(masterAsSlaveFuture); } - RFuture writeFuture = writeConnectionPool.add(masterEntry); - futures.add(writeFuture.toCompletableFuture()); + CompletableFuture writeFuture = writeConnectionPool.add(masterEntry); + futures.add(writeFuture); if (config.getSubscriptionMode() == SubscriptionMode.MASTER) { - RFuture pubSubFuture = pubSubConnectionPool.add(masterEntry); - futures.add(pubSubFuture.toCompletableFuture()); + CompletableFuture pubSubFuture = pubSubConnectionPool.add(masterEntry); + futures.add(pubSubFuture); } return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); }).whenComplete((r, e) -> { @@ -326,9 +326,10 @@ public class MasterSlaveEntry { } } return slaveBalancer.add(entry); - }).exceptionally(ex -> { - client.shutdownAsync(); - return null; + }).whenComplete((r, ex) -> { + if (ex != null) { + client.shutdownAsync(); + } }); } diff --git a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java index f6f4cebdd..89d36897a 100755 --- a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -458,9 +458,10 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { CompletableFuture resolvedFuture = CompletableFuture.allOf(masterAddrFuture.toCompletableFuture(), slaveAddrFuture.toCompletableFuture()); futures.add(resolvedFuture - .exceptionally(exc -> { - log.error("Unable to resolve addresses " + host + " and/or " + masterHost, exc); - return null; + .whenComplete((r, exc) -> { + if (exc != null) { + log.error("Unable to resolve addresses " + host + " and/or " + masterHost, exc); + } }) .thenCompose(res -> { RedisURI slaveAddr = slaveAddrFuture.getNow(); @@ -474,9 +475,10 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } currentSlaves.add(slaveAddr); - return addSlave(slaveAddr).exceptionally(e2 -> { - log.error("Unable to add slave " + slaveAddr, e2); - return null; + return addSlave(slaveAddr).whenComplete((r, e) -> { + if (e != null) { + log.error("Unable to add slave " + slaveAddr, e); + } }); })); } diff --git a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java index 13f6b6f05..318467478 100644 --- a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java +++ b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java @@ -30,7 +30,8 @@ import org.redisson.connection.ConnectionManager; import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.pool.PubSubConnectionPool; import org.redisson.connection.pool.SlaveConnectionPool; -import org.redisson.misc.*; +import org.redisson.misc.RedisURI; +import org.redisson.misc.RedissonPromise; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,8 +39,6 @@ import java.net.InetSocketAddress; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.BiConsumer; /** * @@ -72,22 +71,17 @@ public class LoadBalancerManager { } } - public RFuture add(final ClientConnectionsEntry entry) { - RPromise result = new RedissonPromise(); - - CountableListener listener = new CountableListener(result, null, 2) { - @Override - protected void onSuccess(Void value) { - client2Entry.put(entry.getClient(), entry); - } - }; - - RFuture slaveFuture = slaveConnectionPool.add(entry); - slaveFuture.onComplete(listener); - - RFuture pubSubFuture = pubSubConnectionPool.add(entry); - pubSubFuture.onComplete(listener); - return result; + public CompletableFuture add(ClientConnectionsEntry entry) { + List> futures = new ArrayList<>(2); + CompletableFuture slaveFuture = slaveConnectionPool.add(entry); + futures.add(slaveFuture); + CompletableFuture pubSubFuture = pubSubConnectionPool.add(entry); + futures.add(pubSubFuture); + + CompletableFuture future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + return future.thenAccept(r -> { + client2Entry.put(entry.getClient(), entry); + }); } public Collection getEntries() { @@ -141,28 +135,21 @@ public class LoadBalancerManager { if (!entry.isInitialized()) { entry.setInitialized(true); - AsyncCountDownLatch latch = new AsyncCountDownLatch(); - latch.latch(() -> { - entry.setFreezeReason(null); - }, 2); - - BiConsumer initCallBack = new BiConsumer() { - private final AtomicBoolean initConnError = new AtomicBoolean(false); - @Override - public void accept(Void r, Throwable ex) { - if (ex == null) { - latch.countDown(); - } else { - if (!initConnError.compareAndSet(false, true)) { - return; - } - entry.setInitialized(false); - } - } - }; entry.resetFirstFail(); - slaveConnectionPool.initConnections(entry).onComplete(initCallBack); - pubSubConnectionPool.initConnections(entry).onComplete(initCallBack); + + List> futures = new ArrayList<>(2); + futures.add(slaveConnectionPool.initConnections(entry)); + futures.add(pubSubConnectionPool.initConnections(entry)); + + CompletableFuture future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + future.whenComplete((r, e) -> { + if (e != null) { + entry.setInitialized(false); + return; + } + + entry.setFreezeReason(null); + }); return true; } } diff --git a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java index c7c488578..01ccd7086 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java @@ -67,48 +67,43 @@ abstract class ConnectionPool { this.connectionManager = connectionManager; } - public RFuture add(ClientConnectionsEntry entry) { - RPromise promise = new RedissonPromise(); - promise.onComplete((r, e) -> { - if (e == null) { - entries.add(entry); - } + public CompletableFuture add(ClientConnectionsEntry entry) { + CompletableFuture promise = initConnections(entry, true); + return promise.thenAccept(r -> { + entries.add(entry); }); - initConnections(entry, promise, true); - return promise; } - public RPromise initConnections(ClientConnectionsEntry entry) { - RPromise promise = new RedissonPromise(); - initConnections(entry, promise, false); - return promise; + public CompletableFuture initConnections(ClientConnectionsEntry entry) { + return initConnections(entry, false); } - private void initConnections(ClientConnectionsEntry entry, RPromise initPromise, boolean checkFreezed) { + private CompletableFuture initConnections(ClientConnectionsEntry entry, boolean checkFreezed) { int minimumIdleSize = getMinimumIdleSize(entry); if (minimumIdleSize == 0 || (checkFreezed && entry.isFreezed())) { - initPromise.trySuccess(null); - return; + return CompletableFuture.completedFuture(null); } + CompletableFuture initPromise = new CompletableFuture<>(); AtomicInteger initializedConnections = new AtomicInteger(minimumIdleSize); int startAmount = Math.min(10, minimumIdleSize); AtomicInteger requests = new AtomicInteger(startAmount); for (int i = 0; i < startAmount; i++) { createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections); } + return initPromise; } - private void createConnection(boolean checkFreezed, AtomicInteger requests, ClientConnectionsEntry entry, RPromise initPromise, - int minimumIdleSize, AtomicInteger initializedConnections) { + private void createConnection(boolean checkFreezed, AtomicInteger requests, ClientConnectionsEntry entry, + CompletableFuture initPromise, int minimumIdleSize, AtomicInteger initializedConnections) { if ((checkFreezed && entry.isFreezed()) || !tryAcquireConnection(entry)) { int totalInitializedConnections = minimumIdleSize - initializedConnections.get(); Throwable cause = new RedisConnectionException( "Unable to init enough connections amount! Only " + totalInitializedConnections + " of " + minimumIdleSize + " were initialized. Server: " + entry.getClient().getAddr()); - initPromise.tryFailure(cause); + initPromise.completeExceptionally(cause); return; } @@ -157,13 +152,13 @@ abstract class ConnectionPool { + " of " + minimumIdleSize + " were initialized. Redis server: " + entry.getClient().getAddr(); } Throwable cause = new RedisConnectionException(errorMsg, e); - initPromise.tryFailure(cause); + initPromise.completeExceptionally(cause); return; } int value = initializedConnections.decrementAndGet(); if (value == 0) { - if (initPromise.trySuccess(null)) { + if (initPromise.complete(null)) { log.info("{} connections initialized for {}", minimumIdleSize, entry.getClient().getAddr()); } } else if (value > 0 && !initPromise.isDone()) {