refactoring

pull/555/head
Nikita 9 years ago
parent fae7f5b8ed
commit 9df878be83

@ -55,7 +55,6 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
@ -134,11 +133,18 @@ public class CommandAsyncService implements CommandAsyncExecutor {
public <T, R> Future<Collection<R>> readAllAsync(RedisCommand<T> command, Object ... params) {
final Promise<Collection<R>> mainPromise = connectionManager.newPromise();
final Set<MasterSlaveEntry> nodes = connectionManager.getEntrySet();
Promise<R> promise = new DefaultPromise<R>() {
List<R> results = new ArrayList<R>();
AtomicInteger counter = new AtomicInteger(nodes.size());
Promise<R> promise = connectionManager.newPromise();
final List<R> results = new ArrayList<R>();
final AtomicInteger counter = new AtomicInteger(nodes.size());
promise.addListener(new FutureListener<R>() {
@Override
public Promise<R> setSuccess(R result) {
public void operationComplete(Future<R> future) throws Exception {
if (!future.isSuccess()) {
mainPromise.setFailure(future.cause());
return;
}
R result = future.getNow();
if (result instanceof Collection) {
synchronized (results) {
results.addAll((Collection)result);
@ -153,16 +159,8 @@ public class CommandAsyncService implements CommandAsyncExecutor {
&& !mainPromise.isDone()) {
mainPromise.setSuccess(results);
}
return this;
}
@Override
public Promise<R> setFailure(Throwable cause) {
mainPromise.setFailure(cause);
return this;
}
};
});
for (MasterSlaveEntry entry : nodes) {
async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0);
@ -224,12 +222,18 @@ public class CommandAsyncService implements CommandAsyncExecutor {
private <T, R> Future<R> allAsync(boolean readOnlyMode, RedisCommand<T> command, final SlotCallback<T, R> callback, Object ... params) {
final Promise<R> mainPromise = connectionManager.newPromise();
final Set<MasterSlaveEntry> nodes = connectionManager.getEntrySet();
Promise<T> promise = new DefaultPromise<T>() {
AtomicInteger counter = new AtomicInteger(nodes.size());
Promise<T> promise = connectionManager.newPromise();
final AtomicInteger counter = new AtomicInteger(nodes.size());
promise.addListener(new FutureListener<T>() {
@Override
public Promise<T> setSuccess(T result) {
public void operationComplete(Future<T> future) throws Exception {
if (!future.isSuccess()) {
mainPromise.setFailure(future.cause());
return;
}
if (callback != null) {
callback.onSlotResult(result);
callback.onSlotResult(future.getNow());
}
if (counter.decrementAndGet() == 0) {
if (callback != null) {
@ -238,15 +242,9 @@ public class CommandAsyncService implements CommandAsyncExecutor {
mainPromise.setSuccess(null);
}
}
return this;
}
});
@Override
public Promise<T> setFailure(Throwable cause) {
mainPromise.setFailure(cause);
return this;
}
};
for (MasterSlaveEntry entry : nodes) {
async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0);
}
@ -350,24 +348,24 @@ public class CommandAsyncService implements CommandAsyncExecutor {
public <T, R> Future<R> evalAllAsync(boolean readOnlyMode, RedisCommand<T> command, final SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params) {
final Promise<R> mainPromise = connectionManager.newPromise();
final Set<MasterSlaveEntry> entries = connectionManager.getEntrySet();
Promise<T> promise = new DefaultPromise<T>() {
AtomicInteger counter = new AtomicInteger(entries.size());
Promise<T> promise = connectionManager.newPromise();
final AtomicInteger counter = new AtomicInteger(entries.size());
promise.addListener(new FutureListener<T>() {
@Override
public Promise<T> setSuccess(T result) {
callback.onSlotResult(result);
public void operationComplete(Future<T> future) throws Exception {
if (!future.isSuccess()) {
mainPromise.setFailure(future.cause());
return;
}
callback.onSlotResult(future.getNow());
if (counter.decrementAndGet() == 0
&& !mainPromise.isDone()) {
mainPromise.setSuccess(callback.onFinish());
}
return this;
}
@Override
public Promise<T> setFailure(Throwable cause) {
mainPromise.setFailure(cause);
return this;
}
};
});
List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
args.add(script);
@ -478,20 +476,13 @@ public class CommandAsyncService implements CommandAsyncExecutor {
Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
details.setTimeout(timeout);
if (connectionFuture.isDone()) {
checkConnectionFuture(source, details);
} else {
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
checkConnectionFuture(source, details);
}
});
}
if (attemptPromise.isDone()) {
checkAttemptFuture(source, details, attemptPromise);
} else {
attemptPromise.addListener(new FutureListener<R>() {
@Override
@ -500,7 +491,6 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
});
}
}
private <V, R> void checkWriteFuture(final AsyncDetails<V, R> details, final RedisConnection connection) {
ChannelFuture future = details.getWriteFuture();
@ -644,25 +634,18 @@ public class CommandAsyncService implements CommandAsyncExecutor {
details.setWriteFuture(future);
}
if (details.getWriteFuture().isDone()) {
checkWriteFuture(details, connection);
} else {
details.getWriteFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
checkWriteFuture(details, connection);
}
});
}
releaseConnection(source, details.getConnectionFuture(), details.isReadOnlyMode(), details.getAttemptPromise(), details);
}
protected <V, R> void releaseConnection(final NodeSource source, final Future<RedisConnection> connectionFuture,
final boolean isReadOnly, Promise<R> attemptPromise, final AsyncDetails<V, R> details) {
if (attemptPromise.isDone()) {
releaseConnection(isReadOnly, source, connectionFuture, details);
} else {
attemptPromise.addListener(new FutureListener<R>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<R> future) throws Exception {
@ -670,7 +653,6 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
});
}
}
private <V, R> void releaseConnection(boolean isReadOnly, NodeSource source, Future<RedisConnection> connectionFuture, AsyncDetails<V, R> details) {
if (!connectionFuture.isSuccess()) {

Loading…
Cancel
Save