From 10751fce404f30d0669cb331ed670e2a8284d659 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Tue, 14 Dec 2021 13:06:31 +0300 Subject: [PATCH] refactoring --- .../main/java/org/redisson/RedisNodes.java | 5 +- .../java/org/redisson/client/RedisClient.java | 56 ++++++------------- .../connection/ClientConnectionsEntry.java | 18 +++--- .../MasterSlaveConnectionManager.java | 4 +- .../connection/SentinelConnectionManager.java | 4 +- .../connection/pool/ConnectionPool.java | 13 +++-- .../connection/pool/PubSubConnectionPool.java | 4 +- .../redisson/redisnode/RedissonBaseNodes.java | 9 ++- .../redisson/redisnode/SentinelRedisNode.java | 5 +- .../java/org/redisson/RedisClientTest.java | 4 +- 10 files changed, 51 insertions(+), 71 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedisNodes.java b/redisson/src/main/java/org/redisson/RedisNodes.java index e9cc51468..8e29cb8a5 100644 --- a/redisson/src/main/java/org/redisson/RedisNodes.java +++ b/redisson/src/main/java/org/redisson/RedisNodes.java @@ -31,6 +31,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -121,8 +122,8 @@ public class RedisNodes implements NodesGroup { Map> result = new ConcurrentHashMap<>(clients.size()); CountDownLatch latch = new CountDownLatch(clients.size()); for (RedisClientEntry entry : clients) { - RFuture f = entry.getClient().connectAsync(); - f.onComplete((c, e) -> { + CompletableFuture f = entry.getClient().connectAsync(); + f.whenComplete((c, e) -> { if (c != null) { RFuture r = c.async(timeUnit.toMillis(timeout), RedisCommands.PING); result.put(c, r); diff --git a/redisson/src/main/java/org/redisson/client/RedisClient.java b/redisson/src/main/java/org/redisson/client/RedisClient.java index c141df000..de94488aa 100644 --- a/redisson/src/main/java/org/redisson/client/RedisClient.java +++ b/redisson/src/main/java/org/redisson/client/RedisClient.java @@ -152,11 +152,13 @@ public final class RedisClient { public RedisConnection connect() { try { - return connectAsync().syncUninterruptibly().getNow(); - } catch (RedisException e) { - throw e; - } catch (Exception e) { - throw new RedisConnectionException("Unable to connect to: " + uri, e); + return connectAsync().join(); + } catch (CompletionException e) { + if (e.getCause() instanceof RedisException) { + throw e; + } else { + throw new RedisConnectionException("Unable to connect to: " + uri, e); + } } } @@ -197,9 +199,9 @@ public final class RedisClient { return promise; } - public RFuture connectAsync() { + public CompletableFuture connectAsync() { CompletableFuture addrFuture = resolveAddr(); - CompletableFuture ff = addrFuture.thenCompose(res -> { + return addrFuture.thenCompose(res -> { CompletableFuture r = new CompletableFuture<>(); ChannelFuture channelFuture = bootstrap.connect(res); channelFuture.addListener(new ChannelFutureListener() { @@ -239,33 +241,23 @@ public final class RedisClient { }); return r; }); - - RPromise res = new RedissonPromise<>(); - // TODO refactor - ff.whenComplete((r, e) -> { - if (e != null) { - res.tryFailure(e.getCause()); - return; - } - - res.trySuccess(r); - }); - return res; } public RedisPubSubConnection connectPubSub() { try { - return connectPubSubAsync().syncUninterruptibly().getNow(); - } catch (RedisException e) { - throw e; - } catch (Exception e) { - throw new RedisConnectionException("Unable to connect to: " + uri, e); + return connectPubSubAsync().join(); + } catch (CompletionException e) { + if (e.getCause() instanceof RedisException) { + throw e; + } else { + throw new RedisConnectionException("Unable to connect to: " + uri, e); + } } } - public RFuture connectPubSubAsync() { + public CompletableFuture connectPubSubAsync() { CompletableFuture nameFuture = resolveAddr(); - CompletableFuture ff = nameFuture.thenCompose(res -> { + return nameFuture.thenCompose(res -> { CompletableFuture r = new CompletableFuture<>(); ChannelFuture channelFuture = pubSubBootstrap.connect(res); channelFuture.addListener(new ChannelFutureListener() { @@ -305,18 +297,6 @@ public final class RedisClient { }); return r; }); - - RPromise res = new RedissonPromise<>(); - // TODO refactor - ff.whenComplete((r, e) -> { - if (e != null) { - res.tryFailure(e); - return; - } - - res.trySuccess(r); - }); - return res; } public void shutdown() { diff --git a/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java b/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java index 5ab9cfc17..85383dd45 100644 --- a/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java +++ b/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java @@ -16,7 +16,6 @@ package org.redisson.connection; import org.redisson.api.NodeType; -import org.redisson.api.RFuture; import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisPubSubConnection; @@ -188,19 +187,18 @@ public class ClientConnectionsEntry { freeConnections.add(connection); } - public RFuture connect() { - RFuture future = client.connectAsync(); - future.onComplete((conn, e) -> { + public CompletableFuture connect() { + CompletableFuture future = client.connectAsync(); + return future.whenComplete((conn, e) -> { if (e != null) { return; } - + onConnect(conn); log.debug("new connection created: {}", conn); allConnections.add(conn); }); - return future; } private void onConnect(final RedisConnection conn) { @@ -224,20 +222,18 @@ public class ClientConnectionsEntry { connectionManager.getConnectionEventsHub().fireConnect(conn.getRedisClient().getAddr()); } - public RFuture connectPubSub() { - RFuture future = client.connectPubSubAsync(); - future.onComplete((res, e) -> { + public CompletableFuture connectPubSub() { + CompletableFuture future = client.connectPubSubAsync(); + return future.whenComplete((conn, e) -> { if (e != null) { return; } - RedisPubSubConnection conn = future.getNow(); onConnect(conn); log.debug("new pubsub connection created: {}", conn); allSubscribeConnections.add(conn); }); - return future; } public Queue getAllConnections() { diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 55fd5705d..80bce51ee 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -247,8 +247,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { RedisClient client = createClient(type, addr, cfg.getConnectTimeout(), cfg.getTimeout(), sslHostname); RPromise result = new RedissonPromise<>(); - RFuture future = client.connectAsync(); - future.onComplete((connection, e) -> { + CompletableFuture future = client.connectAsync(); + future.whenComplete((connection, e) -> { if (e != null) { result.tryFailure(e); return; diff --git a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java index b5857b99f..24cc6667d 100755 --- a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -587,8 +587,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } } - RFuture f = client.connectAsync(); - return f.toCompletableFuture().thenApply(resp -> { + CompletableFuture f = client.connectAsync(); + return f.thenApply(resp -> { if (sentinels.putIfAbsent(ipAddr, client) == null) { log.info("sentinel: {} added", ipAddr); } diff --git a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java index 42c70739d..c7c488578 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java @@ -36,6 +36,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Queue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -247,8 +248,8 @@ abstract class ConnectionPool { return (T) entry.pollConnection(command); } - protected RFuture connect(ClientConnectionsEntry entry) { - return (RFuture) entry.connect(); + protected CompletableFuture connect(ClientConnectionsEntry entry) { + return (CompletableFuture) entry.connect(); } private void connectTo(ClientConnectionsEntry entry, RPromise promise, RedisCommand command) { @@ -273,8 +274,8 @@ abstract class ConnectionPool { } private void createConnection(ClientConnectionsEntry entry, RPromise promise) { - RFuture connFuture = connect(entry); - connFuture.onComplete((conn, e) -> { + CompletableFuture connFuture = connect(entry); + connFuture.whenComplete((conn, e) -> { if (e != null) { promiseFailure(entry, promise, e); return; @@ -353,8 +354,8 @@ abstract class ConnectionPool { } } - RFuture connectionFuture = entry.getClient().connectAsync(); - connectionFuture.onComplete((c, e) -> { + CompletableFuture connectionFuture = entry.getClient().connectAsync(); + connectionFuture.whenComplete((c, e) -> { synchronized (entry) { if (entry.getFreezeReason() != FreezeReason.RECONNECT) { return; diff --git a/redisson/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java index 45c63894c..9f47b5aa5 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java @@ -24,6 +24,8 @@ import org.redisson.connection.ClientConnectionsEntry; import org.redisson.connection.ConnectionManager; import org.redisson.connection.MasterSlaveEntry; +import java.util.concurrent.CompletableFuture; + /** * Connection pool for Publish / Subscribe * @@ -51,7 +53,7 @@ public class PubSubConnectionPool extends ConnectionPool } @Override - protected RFuture connect(ClientConnectionsEntry entry) { + protected CompletableFuture connect(ClientConnectionsEntry entry) { return entry.connectPubSub(); } diff --git a/redisson/src/main/java/org/redisson/redisnode/RedissonBaseNodes.java b/redisson/src/main/java/org/redisson/redisnode/RedissonBaseNodes.java index 255362233..6e90f9860 100644 --- a/redisson/src/main/java/org/redisson/redisnode/RedissonBaseNodes.java +++ b/redisson/src/main/java/org/redisson/redisnode/RedissonBaseNodes.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -115,15 +116,13 @@ public class RedissonBaseNodes implements BaseRedisNodes { Map> result = new ConcurrentHashMap<>(clients.size()); CountDownLatch latch = new CountDownLatch(clients.size()); for (RedisNode entry : clients) { - RFuture f = entry.getClient().connectAsync(); - f.onComplete((c, e) -> { + CompletableFuture f = entry.getClient().connectAsync(); + f.whenComplete((c, e) -> { if (c != null) { RFuture r = c.async(timeUnit.toMillis(timeout), RedisCommands.PING); result.put(c, r); - latch.countDown(); - } else { - latch.countDown(); } + latch.countDown(); }); } diff --git a/redisson/src/main/java/org/redisson/redisnode/SentinelRedisNode.java b/redisson/src/main/java/org/redisson/redisnode/SentinelRedisNode.java index 8e09c34fa..24233e9b5 100644 --- a/redisson/src/main/java/org/redisson/redisnode/SentinelRedisNode.java +++ b/redisson/src/main/java/org/redisson/redisnode/SentinelRedisNode.java @@ -34,6 +34,7 @@ import org.redisson.misc.RedissonPromise; import java.net.InetSocketAddress; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; /** @@ -119,8 +120,8 @@ public class SentinelRedisNode implements RedisSentinel, RedisSentinelAsync { private RFuture executeAsync(T defaultValue, Codec codec, long timeout, RedisCommand command, Object... params) { RPromise result = new RedissonPromise<>(); - RFuture connectionFuture = client.connectAsync(); - connectionFuture.onComplete((connection, ex) -> { + CompletableFuture connectionFuture = client.connectAsync(); + connectionFuture.whenComplete((connection, ex) -> { if (ex != null) { if (defaultValue != null) { result.trySuccess(defaultValue); diff --git a/redisson/src/test/java/org/redisson/RedisClientTest.java b/redisson/src/test/java/org/redisson/RedisClientTest.java index bc2cd2f34..23c08b5fe 100644 --- a/redisson/src/test/java/org/redisson/RedisClientTest.java +++ b/redisson/src/test/java/org/redisson/RedisClientTest.java @@ -59,9 +59,9 @@ public class RedisClientTest { @Test public void testConnectAsync() throws InterruptedException { - RFuture f = redisClient.connectAsync(); + CompletableFuture f = redisClient.connectAsync(); CountDownLatch l = new CountDownLatch(2); - f.onComplete((conn, e) -> { + f.whenComplete((conn, e) -> { assertThat(conn.sync(RedisCommands.PING)).isEqualTo("PONG"); l.countDown(); });