refactoring

pull/4104/head
Nikita Koksharov 3 years ago
parent bc176f3010
commit 0b4c94c269

@ -15,7 +15,6 @@
*/
package org.redisson;
import org.redisson.api.NodeType;
import org.redisson.api.RFuture;
import org.redisson.api.RScript;
import org.redisson.client.codec.Codec;
@ -23,8 +22,6 @@ import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.NodeSource;
import org.redisson.misc.CompletableFutureWrapper;
import java.util.*;
@ -61,19 +58,7 @@ public class RedissonScript implements RScript {
@Override
public RFuture<String> scriptLoadAsync(String luaScript) {
Collection<MasterSlaveEntry> nodes = commandExecutor.getConnectionManager().getEntrySet();
List<CompletableFuture<String>> futures = new ArrayList<>();
nodes.forEach(e -> {
RFuture<String> promise = commandExecutor.async(false, new NodeSource(e),
codec, RedisCommands.SCRIPT_LOAD, new Object[]{luaScript}, true, false);
futures.add(promise.toCompletableFuture());
e.getAllEntries().stream().filter(c -> c.getNodeType() == NodeType.SLAVE).forEach(c -> {
RFuture<String> slavePromise = commandExecutor.async(true, new NodeSource(e, c.getClient()),
codec, RedisCommands.SCRIPT_LOAD, new Object[]{luaScript}, true, false);
futures.add(slavePromise.toCompletableFuture());
});
});
List<CompletableFuture<String>> futures = commandExecutor.executeAll(RedisCommands.SCRIPT_LOAD, luaScript);
CompletableFuture<Void> f = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
CompletableFuture<String> s = f.thenApply(r -> futures.get(0).getNow(null));
return new CompletableFutureWrapper<>(s);
@ -151,24 +136,20 @@ public class RedissonScript implements RScript {
}
@Override
public RFuture<List<Boolean>> scriptExistsAsync(final String... shaDigests) {
return commandExecutor.writeAllAsync(RedisCommands.SCRIPT_EXISTS, new SlotCallback<List<Boolean>, List<Boolean>>() {
volatile List<Boolean> result = new ArrayList<>(shaDigests.length);
@Override
public synchronized void onSlotResult(List<Boolean> result) {
for (int i = 0; i < result.size(); i++) {
if (this.result.size() == i) {
this.result.add(false);
}
this.result.set(i, this.result.get(i) | result.get(i));
public RFuture<List<Boolean>> scriptExistsAsync(String... shaDigests) {
List<CompletableFuture<List<Boolean>>> futures = commandExecutor.executeAll(RedisCommands.SCRIPT_EXISTS, (Object[]) shaDigests);
CompletableFuture<Void> f = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
CompletableFuture<List<Boolean>> s = f.thenApply(r -> {
List<Boolean> result = futures.get(0).getNow(new ArrayList<>());
for (CompletableFuture<List<Boolean>> future : futures.subList(1, futures.size())) {
List<Boolean> l = future.getNow(new ArrayList<>());
for (int i = 0; i < l.size(); i++) {
result.set(i, result.get(i) | l.get(i));
}
}
@Override
public List<Boolean> onFinish() {
return new ArrayList<>(result);
}
}, (Object[]) shaDigests);
return result;
});
return new CompletableFutureWrapper<>(s);
}
public List<Boolean> scriptExists(String key, String... shaDigests) {
@ -237,13 +218,13 @@ public class RedissonScript implements RScript {
@Override
public <R> R evalSha(String key, Mode mode, String shaDigest, ReturnType returnType, List<Object> keys,
Object... values) {
return commandExecutor.get((RFuture<R>) evalShaAsync(key, mode, shaDigest, returnType, keys, values));
return commandExecutor.get(evalShaAsync(key, mode, shaDigest, returnType, keys, values));
}
@Override
public <R> R eval(String key, Mode mode, String luaScript, ReturnType returnType, List<Object> keys,
Object... values) {
return commandExecutor.get((RFuture<R>) evalAsync(key, mode, luaScript, returnType, keys, values));
return commandExecutor.get(evalAsync(key, mode, luaScript, returnType, keys, values));
}
}

@ -75,7 +75,7 @@ public interface CommandAsyncExecutor {
<T, R> RFuture<R> readAsync(RedisClient client, Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> evalWriteAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object... params);
<R> List<CompletableFuture<R>> executeAll(RedisCommand<?> command, Object... params);
<R, T> RFuture<R> writeAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, Object... params);

@ -21,6 +21,7 @@ import io.netty.buffer.ByteBufUtil;
import io.netty.util.ReferenceCountUtil;
import org.redisson.RedissonReference;
import org.redisson.SlotCallback;
import org.redisson.api.NodeType;
import org.redisson.api.RFuture;
import org.redisson.cache.LRUCacheMap;
import org.redisson.client.RedisClient;
@ -268,6 +269,24 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return writeAllAsync(command, null, params);
}
@Override
public <R> List<CompletableFuture<R>> executeAll(RedisCommand<?> command, Object... params) {
Collection<MasterSlaveEntry> nodes = connectionManager.getEntrySet();
List<CompletableFuture<R>> futures = new ArrayList<>();
nodes.forEach(e -> {
RFuture<R> promise = async(false, new NodeSource(e),
connectionManager.getCodec(), command, params, true, false);
futures.add(promise.toCompletableFuture());
e.getAllEntries().stream().filter(c -> c.getNodeType() == NodeType.SLAVE).forEach(c -> {
RFuture<R> slavePromise = async(true, new NodeSource(e, c.getClient()),
connectionManager.getCodec(), RedisCommands.SCRIPT_LOAD, params, true, false);
futures.add(slavePromise.toCompletableFuture());
});
});
return futures;
}
@Override
public <R, T> RFuture<R> writeAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, Object... params) {
return allAsync(false, connectionManager.getCodec(), command, callback, params);
@ -401,11 +420,6 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return evalAsync(new NodeSource(entry), false, codec, evalCommandType, script, keys, false, params);
}
@Override
public <T, R> RFuture<R> evalWriteAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object... params) {
return evalAllAsync(false, command, callback, script, keys, params);
}
public <T, R> RFuture<R> evalAllAsync(boolean readOnlyMode, RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object... params) {
RPromise<R> mainPromise = new RedissonPromise<R>();
Collection<MasterSlaveEntry> entries = connectionManager.getEntrySet();

Loading…
Cancel
Save