refactoring

pull/4056/head
Nikita Koksharov 3 years ago
parent 227deaa19a
commit ec21629103

@ -33,16 +33,13 @@ import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReadMode;
import org.redisson.connection.*;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedisURI;
import org.redisson.misc.RedissonPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.stream.Collectors;
@ -433,7 +430,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private void updateClusterState(ClusterServersConfig cfg, RedisConnection connection,
Iterator<RedisURI> iterator, RedisURI uri, AtomicReference<Throwable> lastException) {
RFuture<List<ClusterNodeInfo>> future = connection.async(clusterNodesCommand);
future.onComplete((nodes, e) -> {
future.whenComplete((nodes, e) -> {
if (e != null) {
closeNodeConnection(connection);
lastException.set(e);
@ -461,9 +458,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
CompletableFuture<Collection<ClusterPartition>> newPartitionsFuture = parsePartitions(nodes);
newPartitionsFuture.whenComplete((newPartitions, ex) -> {
RFuture<Void> masterFuture = checkMasterNodesChange(cfg, newPartitions);
CompletableFuture<Void> masterFuture = checkMasterNodesChange(cfg, newPartitions);
checkSlaveNodesChange(newPartitions);
masterFuture.onComplete((res, exc) -> {
masterFuture.whenComplete((res, exc) -> {
checkSlotsMigration(newPartitions);
checkSlotsChange(newPartitions);
getShutdownLatch().release();
@ -555,7 +552,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
});
}
private RFuture<Void> checkMasterNodesChange(ClusterServersConfig cfg, Collection<ClusterPartition> newPartitions) {
private CompletableFuture<Void> checkMasterNodesChange(ClusterServersConfig cfg, Collection<ClusterPartition> newPartitions) {
Map<RedisURI, ClusterPartition> lastPartitions = getLastPartitonsByURI();
Map<RedisURI, ClusterPartition> addedPartitions = new HashMap<>();
Set<RedisURI> mastersElected = new HashSet<>();
@ -596,20 +593,16 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
addedPartitions.keySet().removeAll(mastersElected);
if (addedPartitions.isEmpty()) {
return RedissonPromise.newSucceededFuture(null);
return CompletableFuture.completedFuture(null);
}
RPromise<Void> result = new RedissonPromise<>();
AtomicInteger masters = new AtomicInteger(addedPartitions.size());
List<CompletableFuture<?>> futures = new ArrayList<>();
for (ClusterPartition newPart : addedPartitions.values()) {
CompletionStage<Void> future = addMasterEntry(newPart, cfg);
future.whenComplete((res, e) -> {
if (masters.decrementAndGet() == 0) {
result.trySuccess(null);
}
});
CompletableFuture<Void> future = addMasterEntry(newPart, cfg);
futures.add(future);
}
return result;
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.exceptionally(e -> null);
}
private void checkSlotsChange(Collection<ClusterPartition> newPartitions) {

@ -28,7 +28,6 @@ import org.redisson.connection.NodeSource;
import org.redisson.connection.NodeSource.Redirect;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.misc.LogHelper;
import org.redisson.misc.RedissonPromise;
import java.util.ArrayList;
import java.util.List;
@ -145,7 +144,7 @@ public class RedisQueuedBatchExecutor<V, R> extends BaseRedisBatchExecutor<V, R>
if (connectionEntry.isFirstCommand()) {
List<CommandData<?, ?>> list = new ArrayList<>(2);
list.add(new CommandData<>(new RedissonPromise<Void>(), codec, RedisCommands.MULTI, new Object[]{}));
list.add(new CommandData<>(new CompletableFuture<>(), codec, RedisCommands.MULTI, new Object[]{}));
list.add(new CommandData<>(attemptPromise, codec, command, params));
CompletableFuture<Void> main = new CompletableFuture<>();
writeFuture = connection.send(new CommandsData(main, list, true, syncSlaves));
@ -157,13 +156,13 @@ public class RedisQueuedBatchExecutor<V, R> extends BaseRedisBatchExecutor<V, R>
List<CommandData<?, ?>> list = new ArrayList<>();
if (options.isSkipResult()) {
list.add(new CommandData<>(new RedissonPromise<Void>(), codec, RedisCommands.CLIENT_REPLY, new Object[]{"OFF"}));
list.add(new CommandData<>(new CompletableFuture<>(), codec, RedisCommands.CLIENT_REPLY, new Object[]{"OFF"}));
}
list.add(new CommandData<>(attemptPromise, codec, command, params));
if (options.isSkipResult()) {
list.add(new CommandData<>(new RedissonPromise<Void>(), codec, RedisCommands.CLIENT_REPLY, new Object[]{"ON"}));
list.add(new CommandData<>(new CompletableFuture<>(), codec, RedisCommands.CLIENT_REPLY, new Object[]{"ON"}));
}
if (options.getSyncSlaves() > 0) {
BatchCommandData<?, ?> waitCommand = new BatchCommandData(RedisCommands.WAIT,

@ -29,9 +29,7 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.config.*;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedisURI;
import org.redisson.misc.RedissonPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -577,31 +575,26 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
return toURI(scheme, addr.getAddress().getHostAddress(), "" + addr.getPort());
}
private RFuture<Void> addSlave(RedisURI uri) {
RPromise<Void> result = new RedissonPromise<Void>();
private CompletableFuture<Void> addSlave(RedisURI uri) {
// to avoid addition twice
MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
if (!entry.hasSlave(uri) && !config.checkSkipSlavesInit()) {
CompletableFuture<Void> future = entry.addSlave(uri);
future.whenComplete((res, e) -> {
return future.whenComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
log.error("Can't add slave: " + uri, e);
return;
}
if (entry.isSlaveUnfreezed(uri) || entry.slaveUp(uri, FreezeReason.MANAGER)) {
log.info("slave: {} added", uri);
result.trySuccess(null);
}
});
} else {
if (entry.hasSlave(uri)) {
slaveUp(uri);
}
result.trySuccess(null);
}
return result;
if (entry.hasSlave(uri)) {
slaveUp(uri);
}
return CompletableFuture.completedFuture(null);
}
private void slaveDown(RedisURI uri) {

@ -51,7 +51,7 @@ abstract class PublishSubscribe<E extends PubSubEntry<E>> {
throw new IllegalStateException();
}
service.unsubscribe(PubSubType.UNSUBSCRIBE, new ChannelName(channelName))
.onComplete((r, e) -> {
.whenComplete((r, e) -> {
semaphore.release();
});
} else {

@ -303,7 +303,7 @@ public class PublishSubscribeService {
}
if (!connEntry.hasListeners(channelName)) {
unsubscribe(type, channelName)
.onComplete((r, ex) -> {
.whenComplete((r, ex) -> {
lock.release();
});
} else {
@ -386,14 +386,14 @@ public class PublishSubscribeService {
});
}
public RFuture<Void> unsubscribe(PubSubType topicType, ChannelName channelName) {
public CompletableFuture<Void> unsubscribe(PubSubType topicType, ChannelName channelName) {
PubSubConnectionEntry entry = name2PubSubConnection.remove(createKey(channelName));
if (entry == null || connectionManager.isShuttingDown()) {
return RedissonPromise.newSucceededFuture(null);
return CompletableFuture.completedFuture(null);
}
AtomicBoolean executed = new AtomicBoolean();
RedissonPromise<Void> result = new RedissonPromise<>();
CompletableFuture<Void> result = new CompletableFuture<>();
BaseRedisPubSubListener listener = new BaseRedisPubSubListener() {
@Override
@ -406,7 +406,7 @@ public class PublishSubscribeService {
msEntry.returnPubSubConnection(entry.getConnection());
}
result.trySuccess(null);
result.complete(null);
return true;
}
return false;
@ -644,7 +644,7 @@ public class PublishSubscribeService {
entry.removeListener(channelName, listener);
if (!entry.hasListeners(channelName)) {
unsubscribe(type, channelName)
.onComplete((r, ex) -> {
.whenComplete((r, ex) -> {
if (counter.decrementAndGet() == 0) {
semaphore.release();
promise.complete(null);
@ -686,7 +686,7 @@ public class PublishSubscribeService {
}
if (!entry.hasListeners(channelName)) {
unsubscribe(type, channelName)
.onComplete((r, ex) -> {
.whenComplete((r, ex) -> {
if (counter.decrementAndGet() == 0) {
semaphore.release();
promise.complete(null);

Loading…
Cancel
Save