Batch executor optimization

pull/365/head
Nikita 9 years ago
parent 396fa641f2
commit 6454ec28d4

@ -413,7 +413,6 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
});
}
}
private <V, R> void checkWriteFuture(final AsyncDetails<V, R> details, final RedisConnection connection) {
@ -492,25 +491,31 @@ public class CommandAsyncService implements CommandAsyncExecutor {
});
}
if (details.getAttemptPromise().isDone()) {
releaseConnection(source, details, connection);
releaseConnection(source, details.getConnectionFuture(), details.isReadOnlyMode(), details.getAttemptPromise());
}
protected <R> void releaseConnection(final NodeSource source, final Future<RedisConnection> connectionFuture,
final boolean isReadOnly, Promise<R> attemptPromise) {
if (attemptPromise.isDone()) {
releaseConnection(isReadOnly, source, connectionFuture);
} else {
details.getAttemptPromise().addListener(new FutureListener<R>() {
attemptPromise.addListener(new FutureListener<R>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<R> future) throws Exception {
releaseConnection(source, details, connection);
releaseConnection(isReadOnly, source, connectionFuture);
}
});
}
}
private <V, R> void releaseConnection(NodeSource source, AsyncDetails<V, R> details, RedisConnection connection) {
if (!details.getConnectionFuture().isSuccess()) {
private <V, R> void releaseConnection(boolean isReadOnly, NodeSource source, Future<RedisConnection> connectionFuture) {
if (!connectionFuture.isSuccess()) {
return;
}
RedisConnection connection = connectionFuture.getNow();
connectionManager.getShutdownLatch().release();
if (details.isReadOnlyMode()) {
if (isReadOnly) {
connectionManager.releaseRead(source, connection);
} else {
connectionManager.releaseWrite(source, connection);

@ -208,9 +208,6 @@ public class CommandBatchService extends CommandReactiveService {
final Promise<Void> attemptPromise = connectionManager.newPromise();
final AsyncDetails details = new AsyncDetails();
// final AtomicReference<ChannelFuture> writeFutureRef = new AtomicReference<ChannelFuture>();
// final AtomicReference<RedisException> exceptionRef = new AtomicReference<RedisException>();
// final AtomicReference<Timeout> timeoutRef = new AtomicReference<Timeout>();
final Future<RedisConnection> connectionFuture;
if (entry.isReadOnlyMode()) {
@ -261,85 +258,22 @@ public class CommandBatchService extends CommandReactiveService {
Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);
details.setTimeout(timeout);
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
if (attemptPromise.isDone() || mainPromise.isCancelled() || connFuture.isCancelled()) {
return;
}
if (!connFuture.isSuccess()) {
details.setException(convertException(connFuture));
return;
}
final RedisConnection connection = connFuture.getNow();
if (source.getRedirect() == Redirect.ASK) {
List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(entry.getCommands().size() + 1);
Promise<Void> promise = connectionManager.newPromise();
list.add(new CommandData<Void, Void>(promise, StringCodec.INSTANCE, RedisCommands.ASKING, new Object[] {}));
for (CommandEntry c : entry.getCommands()) {
list.add(c.getCommand());
}
ChannelFuture future = connection.send(new CommandsData(attemptPromise, list));
details.setWriteFuture(future);
} else {
List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(entry.getCommands().size());
FutureListener<Object> listener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (!future.isSuccess() && !mainPromise.isDone()) {
mainPromise.setFailure(future.cause());
}
}
};
for (CommandEntry c : entry.getCommands()) {
c.getCommand().getPromise().addListener(listener);
list.add(c.getCommand());
}
ChannelFuture future = connection.send(new CommandsData(attemptPromise, list));
details.setWriteFuture(future);
}
details.getWriteFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (attemptPromise.isDone() || future.isCancelled()) {
return;
}
if (!future.isSuccess()) {
details.setException(new WriteRedisConnectionException("Can't write command batch to channel: " + future.channel(), future.cause()));
} else {
details.getTimeout().cancel();
TimerTask timeoutTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
attemptPromise.tryFailure(
new RedisTimeoutException("Redis server response timeout during command batch execution. Channel: " + connection.getChannel()));
}
};
Timeout timeout = connectionManager.newTimeout(timeoutTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);
details.setTimeout(timeout);
}
}
});
if (entry.isReadOnlyMode()) {
attemptPromise.addListener(connectionManager.createReleaseReadListener(source, connection));
} else {
attemptPromise.addListener(connectionManager.createReleaseWriteListener(source, connection));
if (connectionFuture.isDone()) {
checkConnectionFuture(entry, source, mainPromise, attemptPromise, details, connectionFuture);
} else {
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
checkConnectionFuture(entry, source, mainPromise, attemptPromise, details, connFuture);
}
}
});
});
}
attemptPromise.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
details.getTimeout().cancel();
if (future.isCancelled() || mainPromise.isDone()) {
if (future.isCancelled()) {
return;
}
@ -369,4 +303,82 @@ public class CommandBatchService extends CommandReactiveService {
});
}
private void checkWriteFuture(final Promise<Void> attemptPromise, AsyncDetails details,
final RedisConnection connection, ChannelFuture future) {
if (attemptPromise.isDone() || future.isCancelled()) {
return;
}
if (!future.isSuccess()) {
details.setException(new WriteRedisConnectionException("Can't write command batch to channel: " + future.channel(), future.cause()));
} else {
details.getTimeout().cancel();
TimerTask timeoutTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
attemptPromise.tryFailure(
new RedisTimeoutException("Redis server response timeout during command batch execution. Channel: " + connection.getChannel()));
}
};
Timeout timeout = connectionManager.newTimeout(timeoutTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);
details.setTimeout(timeout);
}
}
private void checkConnectionFuture(final Entry entry, final NodeSource source,
final Promise<Void> mainPromise, final Promise<Void> attemptPromise, final AsyncDetails details,
Future<RedisConnection> connFuture) {
if (attemptPromise.isDone() || mainPromise.isCancelled() || connFuture.isCancelled()) {
return;
}
if (!connFuture.isSuccess()) {
details.setException(convertException(connFuture));
return;
}
final RedisConnection connection = connFuture.getNow();
if (source.getRedirect() == Redirect.ASK) {
List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(entry.getCommands().size() + 1);
Promise<Void> promise = connectionManager.newPromise();
list.add(new CommandData<Void, Void>(promise, StringCodec.INSTANCE, RedisCommands.ASKING, new Object[] {}));
for (CommandEntry c : entry.getCommands()) {
list.add(c.getCommand());
}
ChannelFuture future = connection.send(new CommandsData(attemptPromise, list));
details.setWriteFuture(future);
} else {
List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(entry.getCommands().size());
FutureListener<Object> listener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (!future.isSuccess() && !mainPromise.isDone()) {
mainPromise.setFailure(future.cause());
}
}
};
for (CommandEntry c : entry.getCommands()) {
c.getCommand().getPromise().addListener(listener);
list.add(c.getCommand());
}
ChannelFuture future = connection.send(new CommandsData(attemptPromise, list));
details.setWriteFuture(future);
}
if (details.getWriteFuture().isDone()) {
checkWriteFuture(attemptPromise, details, connection, details.getWriteFuture());
} else {
details.getWriteFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
checkWriteFuture(attemptPromise, details, connection, future);
}
});
}
releaseConnection(source, connFuture, entry.isReadOnlyMode(), attemptPromise);
}
}

@ -34,7 +34,6 @@ import io.netty.channel.EventLoopGroup;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
/**
@ -84,12 +83,6 @@ public interface ConnectionManager {
Future<RedisConnection> connectionWriteOp(NodeSource source, RedisCommand<?> command);
@Deprecated
<T> FutureListener<T> createReleaseReadListener(NodeSource source, RedisConnection conn);
@Deprecated
<T> FutureListener<T> createReleaseWriteListener(NodeSource source, RedisConnection conn);
RedisClient createClient(String host, int port, int timeout);
RedisClient createClient(String host, int port);

@ -217,36 +217,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return new RedisClient(group, socketChannelClass, host, port, timeout);
}
@Override
public <T> FutureListener<T> createReleaseWriteListener(final NodeSource source, final RedisConnection conn) {
return new FutureListener<T>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<T> future) throws Exception {
if (future.isCancelled()) {
return;
}
shutdownLatch.release();
releaseWrite(source, conn);
}
};
}
@Override
public <T> FutureListener<T> createReleaseReadListener(final NodeSource source, final RedisConnection conn) {
return new FutureListener<T>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<T> future) throws Exception {
if (future.isCancelled()) {
return;
}
shutdownLatch.release();
releaseRead(source, conn);
}
};
}
@Override
public int calcSlot(String key) {
return 0;

Loading…
Cancel
Save