From 6454ec28d44a519deb634559bc9569256ff3eab3 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 23 Dec 2015 13:15:56 +0300 Subject: [PATCH] Batch executor optimization --- .../redisson/command/CommandAsyncService.java | 21 ++- .../redisson/command/CommandBatchService.java | 164 ++++++++++-------- .../connection/ConnectionManager.java | 7 - .../MasterSlaveConnectionManager.java | 30 ---- 4 files changed, 101 insertions(+), 121 deletions(-) diff --git a/src/main/java/org/redisson/command/CommandAsyncService.java b/src/main/java/org/redisson/command/CommandAsyncService.java index cc1259964..51683d586 100644 --- a/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/src/main/java/org/redisson/command/CommandAsyncService.java @@ -413,7 +413,6 @@ public class CommandAsyncService implements CommandAsyncExecutor { } }); } - } private void checkWriteFuture(final AsyncDetails details, final RedisConnection connection) { @@ -492,25 +491,31 @@ public class CommandAsyncService implements CommandAsyncExecutor { }); } - if (details.getAttemptPromise().isDone()) { - releaseConnection(source, details, connection); + releaseConnection(source, details.getConnectionFuture(), details.isReadOnlyMode(), details.getAttemptPromise()); + } + + protected void releaseConnection(final NodeSource source, final Future connectionFuture, + final boolean isReadOnly, Promise attemptPromise) { + if (attemptPromise.isDone()) { + releaseConnection(isReadOnly, source, connectionFuture); } else { - details.getAttemptPromise().addListener(new FutureListener() { + attemptPromise.addListener(new FutureListener() { @Override public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { - releaseConnection(source, details, connection); + releaseConnection(isReadOnly, source, connectionFuture); } }); } } - private void releaseConnection(NodeSource source, AsyncDetails details, RedisConnection connection) { - if (!details.getConnectionFuture().isSuccess()) { + private void releaseConnection(boolean isReadOnly, NodeSource source, Future connectionFuture) { + if (!connectionFuture.isSuccess()) { return; } + RedisConnection connection = connectionFuture.getNow(); connectionManager.getShutdownLatch().release(); - if (details.isReadOnlyMode()) { + if (isReadOnly) { connectionManager.releaseRead(source, connection); } else { connectionManager.releaseWrite(source, connection); diff --git a/src/main/java/org/redisson/command/CommandBatchService.java b/src/main/java/org/redisson/command/CommandBatchService.java index c3a3589a5..514c12e32 100644 --- a/src/main/java/org/redisson/command/CommandBatchService.java +++ b/src/main/java/org/redisson/command/CommandBatchService.java @@ -208,9 +208,6 @@ public class CommandBatchService extends CommandReactiveService { final Promise attemptPromise = connectionManager.newPromise(); final AsyncDetails details = new AsyncDetails(); -// final AtomicReference writeFutureRef = new AtomicReference(); -// final AtomicReference exceptionRef = new AtomicReference(); -// final AtomicReference timeoutRef = new AtomicReference(); final Future connectionFuture; if (entry.isReadOnlyMode()) { @@ -261,85 +258,22 @@ public class CommandBatchService extends CommandReactiveService { Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS); details.setTimeout(timeout); - connectionFuture.addListener(new FutureListener() { - @Override - public void operationComplete(Future connFuture) throws Exception { - if (attemptPromise.isDone() || mainPromise.isCancelled() || connFuture.isCancelled()) { - return; - } - - if (!connFuture.isSuccess()) { - details.setException(convertException(connFuture)); - return; - } - - final RedisConnection connection = connFuture.getNow(); - - if (source.getRedirect() == Redirect.ASK) { - List> list = new ArrayList>(entry.getCommands().size() + 1); - Promise promise = connectionManager.newPromise(); - list.add(new CommandData(promise, StringCodec.INSTANCE, RedisCommands.ASKING, new Object[] {})); - for (CommandEntry c : entry.getCommands()) { - list.add(c.getCommand()); - } - ChannelFuture future = connection.send(new CommandsData(attemptPromise, list)); - details.setWriteFuture(future); - } else { - List> list = new ArrayList>(entry.getCommands().size()); - FutureListener listener = new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess() && !mainPromise.isDone()) { - mainPromise.setFailure(future.cause()); - } - } - }; - - for (CommandEntry c : entry.getCommands()) { - c.getCommand().getPromise().addListener(listener); - list.add(c.getCommand()); - } - ChannelFuture future = connection.send(new CommandsData(attemptPromise, list)); - details.setWriteFuture(future); - } - - details.getWriteFuture().addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (attemptPromise.isDone() || future.isCancelled()) { - return; - } - - if (!future.isSuccess()) { - details.setException(new WriteRedisConnectionException("Can't write command batch to channel: " + future.channel(), future.cause())); - } else { - details.getTimeout().cancel(); - TimerTask timeoutTask = new TimerTask() { - @Override - public void run(Timeout timeout) throws Exception { - attemptPromise.tryFailure( - new RedisTimeoutException("Redis server response timeout during command batch execution. Channel: " + connection.getChannel())); - } - }; - Timeout timeout = connectionManager.newTimeout(timeoutTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS); - details.setTimeout(timeout); - } - } - }); - - if (entry.isReadOnlyMode()) { - attemptPromise.addListener(connectionManager.createReleaseReadListener(source, connection)); - } else { - attemptPromise.addListener(connectionManager.createReleaseWriteListener(source, connection)); + if (connectionFuture.isDone()) { + checkConnectionFuture(entry, source, mainPromise, attemptPromise, details, connectionFuture); + } else { + connectionFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future connFuture) throws Exception { + checkConnectionFuture(entry, source, mainPromise, attemptPromise, details, connFuture); } - } - }); + }); + } attemptPromise.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { details.getTimeout().cancel(); - if (future.isCancelled() || mainPromise.isDone()) { + if (future.isCancelled()) { return; } @@ -369,4 +303,82 @@ public class CommandBatchService extends CommandReactiveService { }); } + private void checkWriteFuture(final Promise attemptPromise, AsyncDetails details, + final RedisConnection connection, ChannelFuture future) { + if (attemptPromise.isDone() || future.isCancelled()) { + return; + } + + if (!future.isSuccess()) { + details.setException(new WriteRedisConnectionException("Can't write command batch to channel: " + future.channel(), future.cause())); + } else { + details.getTimeout().cancel(); + TimerTask timeoutTask = new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + attemptPromise.tryFailure( + new RedisTimeoutException("Redis server response timeout during command batch execution. Channel: " + connection.getChannel())); + } + }; + Timeout timeout = connectionManager.newTimeout(timeoutTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS); + details.setTimeout(timeout); + } + } + + private void checkConnectionFuture(final Entry entry, final NodeSource source, + final Promise mainPromise, final Promise attemptPromise, final AsyncDetails details, + Future connFuture) { + if (attemptPromise.isDone() || mainPromise.isCancelled() || connFuture.isCancelled()) { + return; + } + + if (!connFuture.isSuccess()) { + details.setException(convertException(connFuture)); + return; + } + + final RedisConnection connection = connFuture.getNow(); + + if (source.getRedirect() == Redirect.ASK) { + List> list = new ArrayList>(entry.getCommands().size() + 1); + Promise promise = connectionManager.newPromise(); + list.add(new CommandData(promise, StringCodec.INSTANCE, RedisCommands.ASKING, new Object[] {})); + for (CommandEntry c : entry.getCommands()) { + list.add(c.getCommand()); + } + ChannelFuture future = connection.send(new CommandsData(attemptPromise, list)); + details.setWriteFuture(future); + } else { + List> list = new ArrayList>(entry.getCommands().size()); + FutureListener listener = new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess() && !mainPromise.isDone()) { + mainPromise.setFailure(future.cause()); + } + } + }; + + for (CommandEntry c : entry.getCommands()) { + c.getCommand().getPromise().addListener(listener); + list.add(c.getCommand()); + } + ChannelFuture future = connection.send(new CommandsData(attemptPromise, list)); + details.setWriteFuture(future); + } + + if (details.getWriteFuture().isDone()) { + checkWriteFuture(attemptPromise, details, connection, details.getWriteFuture()); + } else { + details.getWriteFuture().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + checkWriteFuture(attemptPromise, details, connection, future); + } + }); + } + + releaseConnection(source, connFuture, entry.isReadOnlyMode(), attemptPromise); + } + } diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index d52ab2142..15e5b5227 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -34,7 +34,6 @@ import io.netty.channel.EventLoopGroup; import io.netty.util.Timeout; import io.netty.util.TimerTask; import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; /** @@ -84,12 +83,6 @@ public interface ConnectionManager { Future connectionWriteOp(NodeSource source, RedisCommand command); - @Deprecated - FutureListener createReleaseReadListener(NodeSource source, RedisConnection conn); - - @Deprecated - FutureListener createReleaseWriteListener(NodeSource source, RedisConnection conn); - RedisClient createClient(String host, int port, int timeout); RedisClient createClient(String host, int port); diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 4588a319d..df866385e 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -217,36 +217,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return new RedisClient(group, socketChannelClass, host, port, timeout); } - @Override - public FutureListener createReleaseWriteListener(final NodeSource source, final RedisConnection conn) { - return new FutureListener() { - @Override - public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { - if (future.isCancelled()) { - return; - } - - shutdownLatch.release(); - releaseWrite(source, conn); - } - }; - } - - @Override - public FutureListener createReleaseReadListener(final NodeSource source, final RedisConnection conn) { - return new FutureListener() { - @Override - public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { - if (future.isCancelled()) { - return; - } - - shutdownLatch.release(); - releaseRead(source, conn); - } - }; - } - @Override public int calcSlot(String key) { return 0;