From 9df878be830ddb8f52332053f97933b49bbec0dd Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 13 Jul 2016 13:24:13 +0300 Subject: [PATCH] refactoring --- .../redisson/command/CommandAsyncService.java | 140 ++++++++---------- 1 file changed, 61 insertions(+), 79 deletions(-) diff --git a/src/main/java/org/redisson/command/CommandAsyncService.java b/src/main/java/org/redisson/command/CommandAsyncService.java index eae849358..bcac77fb5 100644 --- a/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/src/main/java/org/redisson/command/CommandAsyncService.java @@ -55,7 +55,6 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.util.Timeout; import io.netty.util.TimerTask; -import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; @@ -134,11 +133,18 @@ public class CommandAsyncService implements CommandAsyncExecutor { public Future> readAllAsync(RedisCommand command, Object ... params) { final Promise> mainPromise = connectionManager.newPromise(); final Set nodes = connectionManager.getEntrySet(); - Promise promise = new DefaultPromise() { - List results = new ArrayList(); - AtomicInteger counter = new AtomicInteger(nodes.size()); + Promise promise = connectionManager.newPromise(); + final List results = new ArrayList(); + final AtomicInteger counter = new AtomicInteger(nodes.size()); + promise.addListener(new FutureListener() { @Override - public Promise setSuccess(R result) { + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + mainPromise.setFailure(future.cause()); + return; + } + + R result = future.getNow(); if (result instanceof Collection) { synchronized (results) { results.addAll((Collection)result); @@ -148,21 +154,13 @@ public class CommandAsyncService implements CommandAsyncExecutor { results.add(result); } } - + if (counter.decrementAndGet() == 0 - && !mainPromise.isDone()) { + && !mainPromise.isDone()) { mainPromise.setSuccess(results); } - return this; } - - @Override - public Promise setFailure(Throwable cause) { - mainPromise.setFailure(cause); - return this; - } - - }; + }); for (MasterSlaveEntry entry : nodes) { async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0); @@ -224,12 +222,18 @@ public class CommandAsyncService implements CommandAsyncExecutor { private Future allAsync(boolean readOnlyMode, RedisCommand command, final SlotCallback callback, Object ... params) { final Promise mainPromise = connectionManager.newPromise(); final Set nodes = connectionManager.getEntrySet(); - Promise promise = new DefaultPromise() { - AtomicInteger counter = new AtomicInteger(nodes.size()); + Promise promise = connectionManager.newPromise(); + final AtomicInteger counter = new AtomicInteger(nodes.size()); + promise.addListener(new FutureListener() { @Override - public Promise setSuccess(T result) { + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + mainPromise.setFailure(future.cause()); + return; + } + if (callback != null) { - callback.onSlotResult(result); + callback.onSlotResult(future.getNow()); } if (counter.decrementAndGet() == 0) { if (callback != null) { @@ -238,15 +242,9 @@ public class CommandAsyncService implements CommandAsyncExecutor { mainPromise.setSuccess(null); } } - return this; } + }); - @Override - public Promise setFailure(Throwable cause) { - mainPromise.setFailure(cause); - return this; - } - }; for (MasterSlaveEntry entry : nodes) { async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0); } @@ -350,24 +348,24 @@ public class CommandAsyncService implements CommandAsyncExecutor { public Future evalAllAsync(boolean readOnlyMode, RedisCommand command, final SlotCallback callback, String script, List keys, Object ... params) { final Promise mainPromise = connectionManager.newPromise(); final Set entries = connectionManager.getEntrySet(); - Promise promise = new DefaultPromise() { - AtomicInteger counter = new AtomicInteger(entries.size()); + Promise promise = connectionManager.newPromise(); + final AtomicInteger counter = new AtomicInteger(entries.size()); + promise.addListener(new FutureListener() { + @Override - public Promise setSuccess(T result) { - callback.onSlotResult(result); + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + mainPromise.setFailure(future.cause()); + return; + } + + callback.onSlotResult(future.getNow()); if (counter.decrementAndGet() == 0 && !mainPromise.isDone()) { mainPromise.setSuccess(callback.onFinish()); } - return this; - } - - @Override - public Promise setFailure(Throwable cause) { - mainPromise.setFailure(cause); - return this; } - }; + }); List args = new ArrayList(2 + keys.size() + params.length); args.add(script); @@ -478,28 +476,20 @@ public class CommandAsyncService implements CommandAsyncExecutor { Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); details.setTimeout(timeout); - if (connectionFuture.isDone()) { - checkConnectionFuture(source, details); - } else { - connectionFuture.addListener(new FutureListener() { - @Override - public void operationComplete(Future connFuture) throws Exception { - checkConnectionFuture(source, details); - } - }); - } + connectionFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future connFuture) throws Exception { + checkConnectionFuture(source, details); + } + }); - if (attemptPromise.isDone()) { - checkAttemptFuture(source, details, attemptPromise); - } else { - attemptPromise.addListener(new FutureListener() { + attemptPromise.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - checkAttemptFuture(source, details, future); - } - }); - } + @Override + public void operationComplete(Future future) throws Exception { + checkAttemptFuture(source, details, future); + } + }); } private void checkWriteFuture(final AsyncDetails details, final RedisConnection connection) { @@ -644,32 +634,24 @@ public class CommandAsyncService implements CommandAsyncExecutor { details.setWriteFuture(future); } - if (details.getWriteFuture().isDone()) { - checkWriteFuture(details, connection); - } else { - details.getWriteFuture().addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - checkWriteFuture(details, connection); - } - }); - } + details.getWriteFuture().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + checkWriteFuture(details, connection); + } + }); releaseConnection(source, details.getConnectionFuture(), details.isReadOnlyMode(), details.getAttemptPromise(), details); } protected void releaseConnection(final NodeSource source, final Future connectionFuture, final boolean isReadOnly, Promise attemptPromise, final AsyncDetails details) { - if (attemptPromise.isDone()) { - releaseConnection(isReadOnly, source, connectionFuture, details); - } else { - attemptPromise.addListener(new FutureListener() { - @Override - public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { - releaseConnection(isReadOnly, source, connectionFuture, details); - } - }); - } + attemptPromise.addListener(new FutureListener() { + @Override + public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { + releaseConnection(isReadOnly, source, connectionFuture, details); + } + }); } private void releaseConnection(boolean isReadOnly, NodeSource source, Future connectionFuture, AsyncDetails details) {