refactoring

pull/5045/head
Nikita Koksharov 2 years ago
parent f4e7419350
commit bae032a997

@ -54,13 +54,17 @@ public class CommandBatchService extends CommandAsyncService {
public static class ConnectionEntry {
boolean firstCommand = true;
volatile CompletableFuture<RedisConnection> connectionFuture;
final CompletableFuture<RedisConnection> connectionFuture;
Runnable cancelCallback;
public ConnectionEntry(CompletableFuture<RedisConnection> connectionFuture) {
this.connectionFuture = connectionFuture;
}
public CompletableFuture<RedisConnection> getConnectionFuture() {
return connectionFuture;
}
public boolean isFirstCommand() {
return firstCommand;
}

@ -124,9 +124,12 @@ public class RedisQueuedBatchExecutor<V, R> extends BaseRedisBatchExecutor<V, R>
sentPromise.completeExceptionally(cause);
mainPromise.completeExceptionally(cause);
if (executed.compareAndSet(false, true)) {
getNow(connectionFuture).forceFastReconnectAsync().whenComplete((res, e) -> {
RedisQueuedBatchExecutor.super.releaseConnection(mainPromise, connectionFuture);
});
RedisConnection c = getNow(connectionFuture);
if (c != null) {
c.forceFastReconnectAsync().whenComplete((res, e) -> {
RedisQueuedBatchExecutor.super.releaseConnection(mainPromise, connectionFuture);
});
}
}
return;
}
@ -200,29 +203,23 @@ public class RedisQueuedBatchExecutor<V, R> extends BaseRedisBatchExecutor<V, R>
}
}
private static final AtomicReferenceFieldUpdater<ConnectionEntry, CompletableFuture> CONNECTION_LATCH =
AtomicReferenceFieldUpdater.newUpdater(ConnectionEntry.class,
CompletableFuture.class, "connectionFuture");
@Override
protected CompletableFuture<RedisConnection> getConnection() {
MasterSlaveEntry msEntry = getEntry();
ConnectionEntry entry = connections.computeIfAbsent(msEntry, k -> new ConnectionEntry());
if (CONNECTION_LATCH.compareAndSet(entry, null, new CompletableFuture<>())) {
connectionFuture = entry.getConnectionFuture();
CompletableFuture<RedisConnection> cf;
ConnectionEntry entry = connections.computeIfAbsent(msEntry, k -> {
if (this.options.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC) {
cf = connectionWriteOp(null);
connectionFuture = connectionWriteOp(null);
} else {
cf = connectionReadOp(null);
connectionFuture = connectionReadOp(null);
}
connectionManager.getServiceManager().transfer(cf, connectionFuture);
entry.setCancelCallback(() -> {
ConnectionEntry ce = new ConnectionEntry(connectionFuture);
ce.setCancelCallback(() -> {
handleError(connectionFuture, new CancellationException());
});
}
return ce;
});
return entry.getConnectionFuture();
}

Loading…
Cancel
Save