refactoring

pull/4031/head
Nikita Koksharov 3 years ago
parent e38babd8ea
commit 8a3943906a

@ -137,12 +137,12 @@ public class MasterSlaveEntry {
futures.add(masterAsSlaveFuture);
}
RFuture<Void> writeFuture = writeConnectionPool.add(masterEntry);
futures.add(writeFuture.toCompletableFuture());
CompletableFuture<Void> writeFuture = writeConnectionPool.add(masterEntry);
futures.add(writeFuture);
if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
RFuture<Void> pubSubFuture = pubSubConnectionPool.add(masterEntry);
futures.add(pubSubFuture.toCompletableFuture());
CompletableFuture<Void> pubSubFuture = pubSubConnectionPool.add(masterEntry);
futures.add(pubSubFuture);
}
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}).whenComplete((r, e) -> {
@ -326,9 +326,10 @@ public class MasterSlaveEntry {
}
}
return slaveBalancer.add(entry);
}).exceptionally(ex -> {
client.shutdownAsync();
return null;
}).whenComplete((r, ex) -> {
if (ex != null) {
client.shutdownAsync();
}
});
}

@ -458,9 +458,10 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
CompletableFuture<Void> resolvedFuture = CompletableFuture.allOf(masterAddrFuture.toCompletableFuture(),
slaveAddrFuture.toCompletableFuture());
futures.add(resolvedFuture
.exceptionally(exc -> {
log.error("Unable to resolve addresses " + host + " and/or " + masterHost, exc);
return null;
.whenComplete((r, exc) -> {
if (exc != null) {
log.error("Unable to resolve addresses " + host + " and/or " + masterHost, exc);
}
})
.thenCompose(res -> {
RedisURI slaveAddr = slaveAddrFuture.getNow();
@ -474,9 +475,10 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
currentSlaves.add(slaveAddr);
return addSlave(slaveAddr).exceptionally(e2 -> {
log.error("Unable to add slave " + slaveAddr, e2);
return null;
return addSlave(slaveAddr).whenComplete((r, e) -> {
if (e != null) {
log.error("Unable to add slave " + slaveAddr, e);
}
});
}));
}

@ -30,7 +30,8 @@ import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.pool.PubSubConnectionPool;
import org.redisson.connection.pool.SlaveConnectionPool;
import org.redisson.misc.*;
import org.redisson.misc.RedisURI;
import org.redisson.misc.RedissonPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -38,8 +39,6 @@ import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
/**
*
@ -72,22 +71,17 @@ public class LoadBalancerManager {
}
}
public RFuture<Void> add(final ClientConnectionsEntry entry) {
RPromise<Void> result = new RedissonPromise<Void>();
CountableListener<Void> listener = new CountableListener<Void>(result, null, 2) {
@Override
protected void onSuccess(Void value) {
client2Entry.put(entry.getClient(), entry);
}
};
RFuture<Void> slaveFuture = slaveConnectionPool.add(entry);
slaveFuture.onComplete(listener);
RFuture<Void> pubSubFuture = pubSubConnectionPool.add(entry);
pubSubFuture.onComplete(listener);
return result;
public CompletableFuture<Void> add(ClientConnectionsEntry entry) {
List<CompletableFuture<Void>> futures = new ArrayList<>(2);
CompletableFuture<Void> slaveFuture = slaveConnectionPool.add(entry);
futures.add(slaveFuture);
CompletableFuture<Void> pubSubFuture = pubSubConnectionPool.add(entry);
futures.add(pubSubFuture);
CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
return future.thenAccept(r -> {
client2Entry.put(entry.getClient(), entry);
});
}
public Collection<ClientConnectionsEntry> getEntries() {
@ -141,28 +135,21 @@ public class LoadBalancerManager {
if (!entry.isInitialized()) {
entry.setInitialized(true);
AsyncCountDownLatch latch = new AsyncCountDownLatch();
latch.latch(() -> {
entry.setFreezeReason(null);
}, 2);
BiConsumer<Void, Throwable> initCallBack = new BiConsumer<Void, Throwable>() {
private final AtomicBoolean initConnError = new AtomicBoolean(false);
@Override
public void accept(Void r, Throwable ex) {
if (ex == null) {
latch.countDown();
} else {
if (!initConnError.compareAndSet(false, true)) {
return;
}
entry.setInitialized(false);
}
}
};
entry.resetFirstFail();
slaveConnectionPool.initConnections(entry).onComplete(initCallBack);
pubSubConnectionPool.initConnections(entry).onComplete(initCallBack);
List<CompletableFuture<Void>> futures = new ArrayList<>(2);
futures.add(slaveConnectionPool.initConnections(entry));
futures.add(pubSubConnectionPool.initConnections(entry));
CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
future.whenComplete((r, e) -> {
if (e != null) {
entry.setInitialized(false);
return;
}
entry.setFreezeReason(null);
});
return true;
}
}

@ -67,48 +67,43 @@ abstract class ConnectionPool<T extends RedisConnection> {
this.connectionManager = connectionManager;
}
public RFuture<Void> add(ClientConnectionsEntry entry) {
RPromise<Void> promise = new RedissonPromise<Void>();
promise.onComplete((r, e) -> {
if (e == null) {
entries.add(entry);
}
public CompletableFuture<Void> add(ClientConnectionsEntry entry) {
CompletableFuture<Void> promise = initConnections(entry, true);
return promise.thenAccept(r -> {
entries.add(entry);
});
initConnections(entry, promise, true);
return promise;
}
public RPromise<Void> initConnections(ClientConnectionsEntry entry) {
RPromise<Void> promise = new RedissonPromise<Void>();
initConnections(entry, promise, false);
return promise;
public CompletableFuture<Void> initConnections(ClientConnectionsEntry entry) {
return initConnections(entry, false);
}
private void initConnections(ClientConnectionsEntry entry, RPromise<Void> initPromise, boolean checkFreezed) {
private CompletableFuture<Void> initConnections(ClientConnectionsEntry entry, boolean checkFreezed) {
int minimumIdleSize = getMinimumIdleSize(entry);
if (minimumIdleSize == 0 || (checkFreezed && entry.isFreezed())) {
initPromise.trySuccess(null);
return;
return CompletableFuture.completedFuture(null);
}
CompletableFuture<Void> initPromise = new CompletableFuture<>();
AtomicInteger initializedConnections = new AtomicInteger(minimumIdleSize);
int startAmount = Math.min(10, minimumIdleSize);
AtomicInteger requests = new AtomicInteger(startAmount);
for (int i = 0; i < startAmount; i++) {
createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections);
}
return initPromise;
}
private void createConnection(boolean checkFreezed, AtomicInteger requests, ClientConnectionsEntry entry, RPromise<Void> initPromise,
int minimumIdleSize, AtomicInteger initializedConnections) {
private void createConnection(boolean checkFreezed, AtomicInteger requests, ClientConnectionsEntry entry,
CompletableFuture<Void> initPromise, int minimumIdleSize, AtomicInteger initializedConnections) {
if ((checkFreezed && entry.isFreezed()) || !tryAcquireConnection(entry)) {
int totalInitializedConnections = minimumIdleSize - initializedConnections.get();
Throwable cause = new RedisConnectionException(
"Unable to init enough connections amount! Only " + totalInitializedConnections + " of " + minimumIdleSize + " were initialized. Server: "
+ entry.getClient().getAddr());
initPromise.tryFailure(cause);
initPromise.completeExceptionally(cause);
return;
}
@ -157,13 +152,13 @@ abstract class ConnectionPool<T extends RedisConnection> {
+ " of " + minimumIdleSize + " were initialized. Redis server: " + entry.getClient().getAddr();
}
Throwable cause = new RedisConnectionException(errorMsg, e);
initPromise.tryFailure(cause);
initPromise.completeExceptionally(cause);
return;
}
int value = initializedConnections.decrementAndGet();
if (value == 0) {
if (initPromise.trySuccess(null)) {
if (initPromise.complete(null)) {
log.info("{} connections initialized for {}", minimumIdleSize, entry.getClient().getAddr());
}
} else if (value > 0 && !initPromise.isDone()) {

Loading…
Cancel
Save