From ec21629103e72fb09a263210904e6dc83810ee15 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 29 Dec 2021 10:28:20 +0300 Subject: [PATCH] refactoring --- .../cluster/ClusterConnectionManager.java | 27 +++++++------------ .../command/RedisQueuedBatchExecutor.java | 7 +++-- .../connection/SentinelConnectionManager.java | 19 +++++-------- .../org/redisson/pubsub/PublishSubscribe.java | 2 +- .../pubsub/PublishSubscribeService.java | 14 +++++----- 5 files changed, 27 insertions(+), 42 deletions(-) diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index 13d03bbe0..af720b2c4 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -33,16 +33,13 @@ import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.ReadMode; import org.redisson.connection.*; import org.redisson.connection.ClientConnectionsEntry.FreezeReason; -import org.redisson.misc.RPromise; import org.redisson.misc.RedisURI; -import org.redisson.misc.RedissonPromise; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.stream.Collectors; @@ -433,7 +430,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { private void updateClusterState(ClusterServersConfig cfg, RedisConnection connection, Iterator iterator, RedisURI uri, AtomicReference lastException) { RFuture> future = connection.async(clusterNodesCommand); - future.onComplete((nodes, e) -> { + future.whenComplete((nodes, e) -> { if (e != null) { closeNodeConnection(connection); lastException.set(e); @@ -461,9 +458,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { CompletableFuture> newPartitionsFuture = parsePartitions(nodes); newPartitionsFuture.whenComplete((newPartitions, ex) -> { - RFuture masterFuture = checkMasterNodesChange(cfg, newPartitions); + CompletableFuture masterFuture = checkMasterNodesChange(cfg, newPartitions); checkSlaveNodesChange(newPartitions); - masterFuture.onComplete((res, exc) -> { + masterFuture.whenComplete((res, exc) -> { checkSlotsMigration(newPartitions); checkSlotsChange(newPartitions); getShutdownLatch().release(); @@ -555,7 +552,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { }); } - private RFuture checkMasterNodesChange(ClusterServersConfig cfg, Collection newPartitions) { + private CompletableFuture checkMasterNodesChange(ClusterServersConfig cfg, Collection newPartitions) { Map lastPartitions = getLastPartitonsByURI(); Map addedPartitions = new HashMap<>(); Set mastersElected = new HashSet<>(); @@ -596,20 +593,16 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { addedPartitions.keySet().removeAll(mastersElected); if (addedPartitions.isEmpty()) { - return RedissonPromise.newSucceededFuture(null); + return CompletableFuture.completedFuture(null); } - RPromise result = new RedissonPromise<>(); - AtomicInteger masters = new AtomicInteger(addedPartitions.size()); + List> futures = new ArrayList<>(); for (ClusterPartition newPart : addedPartitions.values()) { - CompletionStage future = addMasterEntry(newPart, cfg); - future.whenComplete((res, e) -> { - if (masters.decrementAndGet() == 0) { - result.trySuccess(null); - } - }); + CompletableFuture future = addMasterEntry(newPart, cfg); + futures.add(future); } - return result; + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) + .exceptionally(e -> null); } private void checkSlotsChange(Collection newPartitions) { diff --git a/redisson/src/main/java/org/redisson/command/RedisQueuedBatchExecutor.java b/redisson/src/main/java/org/redisson/command/RedisQueuedBatchExecutor.java index 0e65eb0a1..75030b6d8 100644 --- a/redisson/src/main/java/org/redisson/command/RedisQueuedBatchExecutor.java +++ b/redisson/src/main/java/org/redisson/command/RedisQueuedBatchExecutor.java @@ -28,7 +28,6 @@ import org.redisson.connection.NodeSource; import org.redisson.connection.NodeSource.Redirect; import org.redisson.liveobject.core.RedissonObjectBuilder; import org.redisson.misc.LogHelper; -import org.redisson.misc.RedissonPromise; import java.util.ArrayList; import java.util.List; @@ -145,7 +144,7 @@ public class RedisQueuedBatchExecutor extends BaseRedisBatchExecutor if (connectionEntry.isFirstCommand()) { List> list = new ArrayList<>(2); - list.add(new CommandData<>(new RedissonPromise(), codec, RedisCommands.MULTI, new Object[]{})); + list.add(new CommandData<>(new CompletableFuture<>(), codec, RedisCommands.MULTI, new Object[]{})); list.add(new CommandData<>(attemptPromise, codec, command, params)); CompletableFuture main = new CompletableFuture<>(); writeFuture = connection.send(new CommandsData(main, list, true, syncSlaves)); @@ -157,13 +156,13 @@ public class RedisQueuedBatchExecutor extends BaseRedisBatchExecutor List> list = new ArrayList<>(); if (options.isSkipResult()) { - list.add(new CommandData<>(new RedissonPromise(), codec, RedisCommands.CLIENT_REPLY, new Object[]{"OFF"})); + list.add(new CommandData<>(new CompletableFuture<>(), codec, RedisCommands.CLIENT_REPLY, new Object[]{"OFF"})); } list.add(new CommandData<>(attemptPromise, codec, command, params)); if (options.isSkipResult()) { - list.add(new CommandData<>(new RedissonPromise(), codec, RedisCommands.CLIENT_REPLY, new Object[]{"ON"})); + list.add(new CommandData<>(new CompletableFuture<>(), codec, RedisCommands.CLIENT_REPLY, new Object[]{"ON"})); } if (options.getSyncSlaves() > 0) { BatchCommandData waitCommand = new BatchCommandData(RedisCommands.WAIT, diff --git a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java index 8ab8c34bb..454df756d 100755 --- a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -29,9 +29,7 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisStrictCommand; import org.redisson.config.*; import org.redisson.connection.ClientConnectionsEntry.FreezeReason; -import org.redisson.misc.RPromise; import org.redisson.misc.RedisURI; -import org.redisson.misc.RedissonPromise; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -577,31 +575,26 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { return toURI(scheme, addr.getAddress().getHostAddress(), "" + addr.getPort()); } - private RFuture addSlave(RedisURI uri) { - RPromise result = new RedissonPromise(); + private CompletableFuture addSlave(RedisURI uri) { // to avoid addition twice MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); if (!entry.hasSlave(uri) && !config.checkSkipSlavesInit()) { CompletableFuture future = entry.addSlave(uri); - future.whenComplete((res, e) -> { + return future.whenComplete((res, e) -> { if (e != null) { - result.tryFailure(e); log.error("Can't add slave: " + uri, e); return; } if (entry.isSlaveUnfreezed(uri) || entry.slaveUp(uri, FreezeReason.MANAGER)) { log.info("slave: {} added", uri); - result.trySuccess(null); } }); - } else { - if (entry.hasSlave(uri)) { - slaveUp(uri); - } - result.trySuccess(null); } - return result; + if (entry.hasSlave(uri)) { + slaveUp(uri); + } + return CompletableFuture.completedFuture(null); } private void slaveDown(RedisURI uri) { diff --git a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribe.java b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribe.java index 555a7c8ad..1d67270e1 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribe.java +++ b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribe.java @@ -51,7 +51,7 @@ abstract class PublishSubscribe> { throw new IllegalStateException(); } service.unsubscribe(PubSubType.UNSUBSCRIBE, new ChannelName(channelName)) - .onComplete((r, e) -> { + .whenComplete((r, e) -> { semaphore.release(); }); } else { diff --git a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java index abea6086e..6ae86e542 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java +++ b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java @@ -303,7 +303,7 @@ public class PublishSubscribeService { } if (!connEntry.hasListeners(channelName)) { unsubscribe(type, channelName) - .onComplete((r, ex) -> { + .whenComplete((r, ex) -> { lock.release(); }); } else { @@ -386,14 +386,14 @@ public class PublishSubscribeService { }); } - public RFuture unsubscribe(PubSubType topicType, ChannelName channelName) { + public CompletableFuture unsubscribe(PubSubType topicType, ChannelName channelName) { PubSubConnectionEntry entry = name2PubSubConnection.remove(createKey(channelName)); if (entry == null || connectionManager.isShuttingDown()) { - return RedissonPromise.newSucceededFuture(null); + return CompletableFuture.completedFuture(null); } AtomicBoolean executed = new AtomicBoolean(); - RedissonPromise result = new RedissonPromise<>(); + CompletableFuture result = new CompletableFuture<>(); BaseRedisPubSubListener listener = new BaseRedisPubSubListener() { @Override @@ -406,7 +406,7 @@ public class PublishSubscribeService { msEntry.returnPubSubConnection(entry.getConnection()); } - result.trySuccess(null); + result.complete(null); return true; } return false; @@ -644,7 +644,7 @@ public class PublishSubscribeService { entry.removeListener(channelName, listener); if (!entry.hasListeners(channelName)) { unsubscribe(type, channelName) - .onComplete((r, ex) -> { + .whenComplete((r, ex) -> { if (counter.decrementAndGet() == 0) { semaphore.release(); promise.complete(null); @@ -686,7 +686,7 @@ public class PublishSubscribeService { } if (!entry.hasListeners(channelName)) { unsubscribe(type, channelName) - .onComplete((r, ex) -> { + .whenComplete((r, ex) -> { if (counter.decrementAndGet() == 0) { semaphore.release(); promise.complete(null);