Fixed - RScript.scriptLoad() doesn't load script into Slave nodes. #4040

pull/4045/head
Nikita Koksharov 3 years ago
parent 72b9f1fabc
commit bcb22e4b50

@ -15,6 +15,7 @@
*/
package org.redisson;
import org.redisson.api.NodeType;
import org.redisson.api.RFuture;
import org.redisson.api.RScript;
import org.redisson.client.codec.Codec;
@ -22,8 +23,14 @@ import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.NodeSource;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import java.util.*;
import java.util.concurrent.CompletableFuture;
/**
*
@ -56,18 +63,24 @@ public class RedissonScript implements RScript {
@Override
public RFuture<String> scriptLoadAsync(String luaScript) {
return commandExecutor.writeAllAsync(StringCodec.INSTANCE, RedisCommands.SCRIPT_LOAD, new SlotCallback<String, String>() {
volatile String result;
@Override
public void onSlotResult(String result) {
this.result = result;
}
@Override
public String onFinish() {
return result;
}
}, luaScript);
Collection<MasterSlaveEntry> nodes = commandExecutor.getConnectionManager().getEntrySet();
List<CompletableFuture<String>> futures = new ArrayList<>();
nodes.forEach(e -> {
RPromise<String> promise = new RedissonPromise<>();
commandExecutor.async(false, new NodeSource(e), codec, RedisCommands.SCRIPT_LOAD,
new Object[]{luaScript}, promise, true, false);
futures.add(promise.toCompletableFuture());
e.getAllEntries().stream().filter(c -> c.getNodeType() == NodeType.SLAVE).forEach(c -> {
RPromise<String> slavePromise = new RedissonPromise<>();
commandExecutor.async(true, new NodeSource(e, c.getClient()), codec, RedisCommands.SCRIPT_LOAD,
new Object[]{luaScript}, slavePromise, true, false);
futures.add(slavePromise.toCompletableFuture());
});
});
CompletableFuture<Void> f = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
CompletableFuture<String> s = f.thenApply(r -> futures.get(0).getNow(null));
return new CompletableFutureWrapper<>(s);
}
@Override

@ -24,7 +24,9 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.NodeSource;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.misc.RPromise;
import java.util.Collection;
import java.util.List;
@ -108,7 +110,11 @@ public interface CommandAsyncExecutor {
<T, R> RFuture<R> readRandomAsync(Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> readRandomAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params);
<V, R> void async(boolean readOnlyMode, NodeSource source, Codec codec,
RedisCommand<V> command, Object[] params, RPromise<R> mainPromise,
boolean ignoreRedirect, boolean noRetry);
<V> RFuture<V> pollFromAnyAsync(String name, Codec codec, RedisCommand<Object> command, long secondsTimeout, String... queueNames);
ByteBuf encode(Codec codec, Object value);

Loading…
Cancel
Save