From bcb22e4b50045bc997cddbe9656be11cc9494439 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 23 Dec 2021 10:55:46 +0300 Subject: [PATCH] Fixed - RScript.scriptLoad() doesn't load script into Slave nodes. #4040 --- .../java/org/redisson/RedissonScript.java | 37 +++++++++++++------ .../command/CommandAsyncExecutor.java | 8 +++- 2 files changed, 32 insertions(+), 13 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonScript.java b/redisson/src/main/java/org/redisson/RedissonScript.java index 0d174350e..379acd993 100644 --- a/redisson/src/main/java/org/redisson/RedissonScript.java +++ b/redisson/src/main/java/org/redisson/RedissonScript.java @@ -15,6 +15,7 @@ */ package org.redisson; +import org.redisson.api.NodeType; import org.redisson.api.RFuture; import org.redisson.api.RScript; import org.redisson.client.codec.Codec; @@ -22,8 +23,14 @@ import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; +import org.redisson.connection.MasterSlaveEntry; +import org.redisson.connection.NodeSource; +import org.redisson.misc.CompletableFutureWrapper; +import org.redisson.misc.RPromise; +import org.redisson.misc.RedissonPromise; import java.util.*; +import java.util.concurrent.CompletableFuture; /** * @@ -56,18 +63,24 @@ public class RedissonScript implements RScript { @Override public RFuture scriptLoadAsync(String luaScript) { - return commandExecutor.writeAllAsync(StringCodec.INSTANCE, RedisCommands.SCRIPT_LOAD, new SlotCallback() { - volatile String result; - @Override - public void onSlotResult(String result) { - this.result = result; - } - - @Override - public String onFinish() { - return result; - } - }, luaScript); + Collection nodes = commandExecutor.getConnectionManager().getEntrySet(); + List> futures = new ArrayList<>(); + nodes.forEach(e -> { + RPromise promise = new RedissonPromise<>(); + commandExecutor.async(false, new NodeSource(e), codec, RedisCommands.SCRIPT_LOAD, + new Object[]{luaScript}, promise, true, false); + futures.add(promise.toCompletableFuture()); + + e.getAllEntries().stream().filter(c -> c.getNodeType() == NodeType.SLAVE).forEach(c -> { + RPromise slavePromise = new RedissonPromise<>(); + commandExecutor.async(true, new NodeSource(e, c.getClient()), codec, RedisCommands.SCRIPT_LOAD, + new Object[]{luaScript}, slavePromise, true, false); + futures.add(slavePromise.toCompletableFuture()); + }); + }); + CompletableFuture f = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + CompletableFuture s = f.thenApply(r -> futures.get(0).getNow(null)); + return new CompletableFutureWrapper<>(s); } @Override diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java b/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java index e5af40ccf..781784af7 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java @@ -24,7 +24,9 @@ import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.connection.ConnectionManager; import org.redisson.connection.MasterSlaveEntry; +import org.redisson.connection.NodeSource; import org.redisson.liveobject.core.RedissonObjectBuilder; +import org.redisson.misc.RPromise; import java.util.Collection; import java.util.List; @@ -108,7 +110,11 @@ public interface CommandAsyncExecutor { RFuture readRandomAsync(Codec codec, RedisCommand command, Object... params); RFuture readRandomAsync(MasterSlaveEntry entry, Codec codec, RedisCommand command, Object... params); - + + void async(boolean readOnlyMode, NodeSource source, Codec codec, + RedisCommand command, Object[] params, RPromise mainPromise, + boolean ignoreRedirect, boolean noRetry); + RFuture pollFromAnyAsync(String name, Codec codec, RedisCommand command, long secondsTimeout, String... queueNames); ByteBuf encode(Codec codec, Object value);