refactoring

pull/4125/head
Nikita Koksharov 3 years ago
parent d649843210
commit fc0604d7c8

@ -410,38 +410,6 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return evalAsync(new NodeSource(entry), false, codec, evalCommandType, script, keys, false, params);
}
public <T, R> RFuture<R> evalAllAsync(boolean readOnlyMode, RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object... params) {
RPromise<R> mainPromise = new RedissonPromise<R>();
Collection<MasterSlaveEntry> entries = connectionManager.getEntrySet();
AtomicInteger counter = new AtomicInteger(entries.size());
BiConsumer<T, Throwable> listener = new BiConsumer<T, Throwable>() {
@Override
public void accept(T t, Throwable u) {
if (u != null && !(u instanceof RedisRedirectException)) {
mainPromise.tryFailure(u);
return;
}
callback.onSlotResult(t);
if (counter.decrementAndGet() == 0
&& !mainPromise.isDone()) {
mainPromise.trySuccess(callback.onFinish());
}
}
};
List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
args.add(script);
args.add(keys.size());
args.addAll(keys);
args.addAll(Arrays.asList(params));
for (MasterSlaveEntry entry : entries) {
RFuture<T> promise = async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, args.toArray(), true, false);
promise.whenComplete(listener);
}
return mainPromise;
}
private RFuture<String> loadScript(RedisClient client, String script) {
MasterSlaveEntry entry = getConnectionManager().getEntry(client);
if (entry.getClient().equals(client)) {

Loading…
Cancel
Save