refactoring

pull/4031/head
Nikita Koksharov 3 years ago
parent 79971af003
commit 39c1b70b53

@ -152,7 +152,7 @@ public final class RedisClient {
return connectAsync().join();
} catch (CompletionException e) {
if (e.getCause() instanceof RedisException) {
throw e;
throw (RedisException) e.getCause();
} else {
throw new RedisConnectionException("Unable to connect to: " + uri, e);
}
@ -245,7 +245,7 @@ public final class RedisClient {
return connectPubSubAsync().join();
} catch (CompletionException e) {
if (e.getCause() instanceof RedisException) {
throw e;
throw (RedisException) e.getCause();
} else {
throw new RedisConnectionException("Unable to connect to: " + uri, e);
}

@ -42,10 +42,7 @@ import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;
@ -91,9 +88,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
List<String> failedMasters = new ArrayList<String>();
for (String address : cfg.getNodeAddresses()) {
RedisURI addr = new RedisURI(address);
RFuture<RedisConnection> connectionFuture = connectToNode(cfg, addr, addr.getHost());
CompletableFuture<RedisConnection> connectionFuture = connectToNode(cfg, addr, addr.getHost());
try {
RedisConnection connection = connectionFuture.syncUninterruptibly().getNow();
RedisConnection connection = connectionFuture.join();
if (cfg.getNodeAddresses().size() == 1 && !addr.isIP()) {
configEndpointHostName = addr.getHost();
@ -116,7 +113,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
RFuture<Collection<ClusterPartition>> partitionsFuture = parsePartitions(nodes);
Collection<ClusterPartition> partitions = partitionsFuture.syncUninterruptibly().getNow();
List<RFuture<Void>> masterFutures = new ArrayList<>();
List<CompletableFuture<Void>> masterFutures = new ArrayList<>();
for (ClusterPartition partition : partitions) {
if (partition.isMasterFail()) {
failedMasters.add(partition.getMasterAddress().toString());
@ -126,18 +123,21 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
throw new IllegalStateException("Master node: " + partition.getNodeId() + " doesn't have address.");
}
RFuture<Void> masterFuture = addMasterEntry(partition, cfg);
CompletableFuture<Void> masterFuture = addMasterEntry(partition, cfg);
masterFutures.add(masterFuture);
}
for (RFuture<Void> masterFuture : masterFutures) {
masterFuture.awaitUninterruptibly();
if (!masterFuture.isSuccess()) {
lastException = masterFuture.cause();
}
CompletableFuture<Void> masterFuture = CompletableFuture.allOf(masterFutures.toArray(new CompletableFuture[0]));
try {
masterFuture.join();
} catch (CompletionException e) {
lastException = e.getCause();
}
break;
} catch (Exception e) {
if (e instanceof CompletionException) {
e = (Exception) e.getCause();
}
lastException = e;
log.warn(e.getMessage());
}
@ -262,7 +262,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
return result;
}
private RFuture<Void> addMasterEntry(ClusterPartition partition, ClusterServersConfig cfg) {
private CompletableFuture<Void> addMasterEntry(ClusterPartition partition, ClusterServersConfig cfg) {
CompletableFuture<Void> result = new CompletableFuture<>();
if (partition.isMasterFail()) {
RedisException e = new RedisException("Failed to add master: " +
partition.getMasterAddress() + " for slot ranges: " +
@ -272,15 +274,15 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
e = new RedisException("Failed to add master: " +
partition.getMasterAddress() + ". Reason - server has FAIL flag");
}
return RedissonPromise.newFailedFuture(e);
result.completeExceptionally(e);
return result;
}
RPromise<Void> result = new RedissonPromise<>();
RFuture<RedisConnection> connectionFuture = connectToNode(cfg, partition.getMasterAddress(), configEndpointHostName);
connectionFuture.onComplete((connection, ex1) -> {
CompletableFuture<RedisConnection> connectionFuture = connectToNode(cfg, partition.getMasterAddress(), configEndpointHostName);
connectionFuture.whenComplete((connection, ex1) -> {
if (ex1 != null) {
log.error("Can't connect to master: {} with slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges());
result.tryFailure(ex1);
result.completeExceptionally(ex1);
return;
}
@ -301,7 +303,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
f.whenComplete((masterClient, ex3) -> {
if (ex3 != null) {
log.error("Can't add master: " + partition.getMasterAddress() + " for slot ranges: " + partition.getSlotRanges(), ex3);
result.tryFailure(ex3);
result.completeExceptionally(ex3);
return;
}
@ -316,7 +318,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
if (ex != null) {
log.error("unable to add slave for: " + partition.getMasterAddress()
+ " slot ranges: " + partition.getSlotRanges(), ex);
result.tryFailure(ex);
result.completeExceptionally(ex);
return;
}
@ -327,14 +329,14 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
}
if (result.trySuccess(null)) {
if (result.complete(null)) {
log.info("master: {} added for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges());
} else {
log.error("unable to add master: {} for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges());
}
});
} else {
if (result.trySuccess(null)) {
if (result.complete(null)) {
log.info("master: {} added for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges());
} else {
log.error("unable to add master: {} for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges());
@ -417,8 +419,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
return;
}
RedisURI uri = iterator.next();
RFuture<RedisConnection> connectionFuture = connectToNode(cfg, uri, configEndpointHostName);
connectionFuture.onComplete((connection, e) -> {
CompletableFuture<RedisConnection> connectionFuture = connectToNode(cfg, uri, configEndpointHostName);
connectionFuture.whenComplete((connection, e) -> {
if (e != null) {
lastException.set(e);
getShutdownLatch().release();
@ -602,8 +604,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
RPromise<Void> result = new RedissonPromise<>();
AtomicInteger masters = new AtomicInteger(addedPartitions.size());
for (ClusterPartition newPart : addedPartitions.values()) {
RFuture<Void> future = addMasterEntry(newPart, cfg);
future.onComplete((res, e) -> {
CompletionStage<Void> future = addMasterEntry(newPart, cfg);
future.whenComplete((res, e) -> {
if (masters.decrementAndGet() == 0) {
result.trySuccess(null);
}

@ -231,29 +231,23 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
}
protected final RFuture<RedisConnection> connectToNode(BaseConfig<?> cfg, RedisURI addr, String sslHostname) {
protected final CompletableFuture<RedisConnection> connectToNode(BaseConfig<?> cfg, RedisURI addr, String sslHostname) {
return connectToNode(NodeType.MASTER, cfg, addr, sslHostname);
}
protected final RFuture<RedisConnection> connectToNode(NodeType type, BaseConfig<?> cfg, RedisURI addr, String sslHostname) {
protected final CompletableFuture<RedisConnection> connectToNode(NodeType type, BaseConfig<?> cfg, RedisURI addr, String sslHostname) {
RedisConnection conn = nodeConnections.get(addr);
if (conn != null) {
if (!conn.isActive()) {
closeNodeConnection(conn);
} else {
return RedissonPromise.newSucceededFuture(conn);
return CompletableFuture.completedFuture(conn);
}
}
RedisClient client = createClient(type, addr, cfg.getConnectTimeout(), cfg.getTimeout(), sslHostname);
RPromise<RedisConnection> result = new RedissonPromise<>();
CompletableFuture<RedisConnection> future = client.connectAsync();
future.whenComplete((connection, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
return future.thenCompose(connection -> {
if (connection.isActive()) {
if (!addr.isIP()) {
RedisURI address = new RedisURI(addr.getScheme()
@ -262,14 +256,14 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
nodeConnections.put(address, connection);
}
nodeConnections.put(addr, connection);
result.trySuccess(connection);
return CompletableFuture.completedFuture(connection);
} else {
connection.closeAsync();
result.tryFailure(new RedisException("Connection to " + connection.getRedisClient().getAddr() + " is not active!"));
CompletableFuture<RedisConnection> f = new CompletableFuture<>();
f.completeExceptionally(new RedisException("Connection to " + connection.getRedisClient().getAddr() + " is not active!"));
return f;
}
});
return result;
}
@Override

@ -71,9 +71,13 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
for (String address : cfg.getNodeAddresses()) {
RedisURI addr = new RedisURI(address);
RFuture<RedisConnection> connectionFuture = connectToNode(cfg, addr, addr.getHost());
connectionFuture.awaitUninterruptibly();
RedisConnection connection = connectionFuture.getNow();
CompletableFuture<RedisConnection> connectionFuture = connectToNode(cfg, addr, addr.getHost());
RedisConnection connection = null;
try {
connection = connectionFuture.join();
} catch (Exception e) {
// skip
}
if (connection == null) {
continue;
}
@ -157,8 +161,8 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
}
private void checkNode(AsyncCountDownLatch latch, RedisURI uri, ReplicatedServersConfig cfg, Set<InetSocketAddress> slaveIPs) {
RFuture<RedisConnection> connectionFuture = connectToNode(cfg, uri, uri.getHost());
connectionFuture.onComplete((connection, exc) -> {
CompletableFuture<RedisConnection> connectionFuture = connectToNode(cfg, uri, uri.getHost());
connectionFuture.whenComplete((connection, exc) -> {
if (exc != null) {
log.error(exc.getMessage(), exc);
latch.countDown();

@ -29,7 +29,6 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.config.*;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.AsyncCountDownLatch;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedisURI;
import org.redisson.misc.RedissonPromise;
@ -41,10 +40,8 @@ import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
/**
@ -341,8 +338,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
RedisClient client = iterator.next();
RedisURI addr = toURI(client.getAddr());
RFuture<RedisConnection> connectionFuture = connectToNode(NodeType.SENTINEL, cfg, addr, null);
connectionFuture.onComplete((connection, e) -> {
CompletableFuture<RedisConnection> connectionFuture = connectToNode(NodeType.SENTINEL, cfg, addr, null);
connectionFuture.whenComplete((connection, e) -> {
if (e != null) {
lastException.set(e);
getShutdownLatch().release();
@ -356,54 +353,46 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
private void updateState(SentinelServersConfig cfg, RedisConnection connection, Iterator<RedisClient> iterator) {
AtomicInteger commands = new AtomicInteger(2);
BiConsumer<Object, Throwable> commonListener = new BiConsumer<Object, Throwable>() {
private final AtomicBoolean failed = new AtomicBoolean();
@Override
public void accept(Object t, Throwable u) {
if (commands.decrementAndGet() == 0) {
getShutdownLatch().release();
if (failed.get()) {
scheduleChangeCheck(cfg, iterator);
} else {
scheduleChangeCheck(cfg, null);
}
}
if (u != null && failed.compareAndSet(false, true)) {
log.error("Can't execute SENTINEL commands on " + connection.getRedisClient().getAddr(), u);
closeNodeConnection(connection);
}
}
};
RFuture<RedisURI> masterFuture = checkMasterChange(cfg, connection);
masterFuture.onComplete(commonListener);
List<CompletableFuture<?>> futures = new ArrayList<>();
CompletionStage<RedisURI> masterFuture = checkMasterChange(cfg, connection);
futures.add(masterFuture.toCompletableFuture());
if (!config.checkSkipSlavesInit()) {
commands.incrementAndGet();
RFuture<List<Map<String, String>>> slavesFuture = checkSlavesChange(cfg, connection);
slavesFuture.onComplete(commonListener);
CompletionStage<Void> slavesFuture = checkSlavesChange(cfg, connection);
futures.add(slavesFuture.toCompletableFuture());
}
RFuture<List<Map<String, String>>> sentinelsFuture = checkSentinelsChange(cfg, connection);
sentinelsFuture.onComplete(commonListener);
CompletionStage<Void> sentinelsFuture = checkSentinelsChange(cfg, connection);
futures.add(sentinelsFuture.toCompletableFuture());
CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
future.whenComplete((r, e) -> {
if (e != null) {
log.error("Can't execute SENTINEL commands on " + connection.getRedisClient().getAddr(), e);
closeNodeConnection(connection);
}
getShutdownLatch().release();
if (e != null) {
scheduleChangeCheck(cfg, iterator);
} else {
scheduleChangeCheck(cfg, null);
}
});
}
private RFuture<List<Map<String, String>>> checkSentinelsChange(SentinelServersConfig cfg, RedisConnection connection) {
private CompletionStage<Void> checkSentinelsChange(SentinelServersConfig cfg, RedisConnection connection) {
if (!cfg.isSentinelsDiscovery()) {
return RedissonPromise.newSucceededFuture(null);
return CompletableFuture.completedFuture(null);
}
RFuture<List<Map<String, String>>> sentinelsFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_SENTINELS, cfg.getMasterName());
sentinelsFuture.onComplete((list, e) -> {
if (e != null || list.isEmpty()) {
return;
return sentinelsFuture.thenCompose(list -> {
if (list.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
AsyncCountDownLatch latch = new AsyncCountDownLatch();
List<RFuture<RedisURI>> newUris = list.stream().filter(m -> {
List<CompletableFuture<RedisURI>> newUris = list.stream().filter(m -> {
String flags = m.getOrDefault("flags", "");
String masterLinkStatus = m.getOrDefault("master-link-status", "");
if (!m.isEmpty() && !isSlaveDown(flags, masterLinkStatus)) {
@ -415,40 +404,39 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String port = m.get("port");
return toURI(ip, port);
}).map(addr -> {
RFuture<RedisURI> f = resolveIP(addr);
f.onComplete((res, ex) -> {
if (ex != null) {
log.error("unable to resolve hostname", ex);
}
latch.countDown();
});
return f;
CompletionStage<RedisURI> f = resolveIP(addr);
return f.exceptionally(ex -> {
log.error("unable to resolve hostname", ex);
return null;
}).toCompletableFuture();
}).collect(Collectors.toList());
latch.latch(() -> {
List<RedisURI> uris = newUris.stream().map(u -> u.getNow()).filter(u -> u != null).collect(Collectors.toList());
CompletableFuture<Void> futures = CompletableFuture.allOf(newUris.toArray(new CompletableFuture[0]));
return futures.whenComplete((r, ex) -> {
List<RedisURI> uris = newUris.stream().map(u -> {
try {
return u.getNow(null);
} catch (Exception exc) {
return null;
}
}).filter(u -> u != null).collect(Collectors.toList());
InetSocketAddress addr = connection.getRedisClient().getAddr();
RedisURI currentAddr = toURI(addr);
uris.add(currentAddr);
updateSentinels(uris);
}, newUris.size());
});
});
return sentinelsFuture;
}
private RFuture<List<Map<String, String>>> checkSlavesChange(SentinelServersConfig cfg, RedisConnection connection) {
private CompletionStage<Void> checkSlavesChange(SentinelServersConfig cfg, RedisConnection connection) {
RFuture<List<Map<String, String>>> slavesFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_SLAVES, cfg.getMasterName());
slavesFuture.onComplete((slavesMap, ex) -> {
if (ex != null) {
return;
}
return slavesFuture.thenCompose(slavesMap -> {
Set<RedisURI> currentSlaves = Collections.newSetFromMap(new ConcurrentHashMap<>(slavesMap.size()));
AsyncCountDownLatch latch = new AsyncCountDownLatch();
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (Map<String, String> map : slavesMap) {
if (map.isEmpty()) {
latch.countDown();
continue;
}
@ -466,52 +454,48 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
} else {
masterAddrFuture = resolveIP(masterHost, masterPort);
}
CompletableFuture<Void> resolvedFuture = CompletableFuture.allOf(masterAddrFuture.toCompletableFuture(),
slaveAddrFuture.toCompletableFuture());
resolvedFuture.whenComplete((res, exc) -> {
if (exc != null) {
log.error("Unable to resolve addresses " + host + " and/or " + masterHost, exc);
latch.countDown();
return;
}
RedisURI slaveAddr = slaveAddrFuture.getNow();
RedisURI masterAddr = masterAddrFuture.getNow();
if (isSlaveDown(flags, masterLinkStatus)) {
slaveDown(slaveAddr);
latch.countDown();
return;
}
if ("?".equals(masterHost) || !isUseSameMaster(slaveAddr, masterAddr)) {
latch.countDown();
return;
}
currentSlaves.add(slaveAddr);
addSlave(slaveAddr).onComplete((r, e2) -> {
latch.countDown();
if (e2 != null) {
log.error("Unable to add slave " + slaveAddr, e2);
}
});
});
futures.add(resolvedFuture
.exceptionally(exc -> {
log.error("Unable to resolve addresses " + host + " and/or " + masterHost, exc);
return null;
})
.thenCompose(res -> {
RedisURI slaveAddr = slaveAddrFuture.getNow();
RedisURI masterAddr = masterAddrFuture.getNow();
if (isSlaveDown(flags, masterLinkStatus)) {
slaveDown(slaveAddr);
return CompletableFuture.completedFuture(res);
}
if ("?".equals(masterHost) || !isUseSameMaster(slaveAddr, masterAddr)) {
return CompletableFuture.completedFuture(res);
}
currentSlaves.add(slaveAddr);
return addSlave(slaveAddr).exceptionally(e2 -> {
log.error("Unable to add slave " + slaveAddr, e2);
return null;
});
}));
}
latch.latch(() -> {
CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
return future.whenComplete((r, exc) -> {
MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
entry.getAllEntries().stream()
.map(e -> e.getClient().getAddr())
.map(a -> toURI(a.getAddress().getHostAddress(), String.valueOf(a.getPort())))
.filter(a -> !currentSlaves.contains(a) && !a.equals(currentMaster.get()))
.forEach(a -> slaveDown(a));
}, slavesMap.size());
});
});
return slavesFuture;
}
private RFuture<RedisURI> checkMasterChange(SentinelServersConfig cfg, RedisConnection connection) {
private CompletionStage<RedisURI> checkMasterChange(SentinelServersConfig cfg, RedisConnection connection) {
RFuture<RedisURI> masterFuture = connection.async(StringCodec.INSTANCE, masterHostCommand, cfg.getMasterName());
masterFuture.thenCompose(u -> resolveIP(scheme, u))
return masterFuture.thenCompose(u -> resolveIP(scheme, u))
.whenComplete((newMaster, e) -> {
if (e != null) {
return;
@ -527,7 +511,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
});
}
});
return masterFuture;
}
private void updateSentinels(Collection<RedisURI> newUris) {

Loading…
Cancel
Save