diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 86ea95685..d7651b8aa 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -465,27 +465,12 @@ public class CommandAsyncService implements CommandAsyncExecutor { return mainPromise; } - private RFuture loadScript(List keys, String script) { - if (!keys.isEmpty()) { - Object key = keys.get(0); - if (key instanceof byte[]) { - return writeAsync((byte[]) key, StringCodec.INSTANCE, RedisCommands.SCRIPT_LOAD, script); - } - return writeAsync((String) key, StringCodec.INSTANCE, RedisCommands.SCRIPT_LOAD, script); + private RFuture loadScript(RedisClient client, String script) { + MasterSlaveEntry entry = getConnectionManager().getEntry(client); + if (entry.getClient().equals(client)) { + return writeAsync(entry, StringCodec.INSTANCE, RedisCommands.SCRIPT_LOAD, script); } - - return writeAllAsync(RedisCommands.SCRIPT_LOAD, new SlotCallback() { - volatile String result; - @Override - public void onSlotResult(String result) { - this.result = result; - } - - @Override - public String onFinish() { - return result; - } - }, script); + return readAsync(client, StringCodec.INSTANCE, RedisCommands.SCRIPT_LOAD, script); } protected boolean isEvalCacheActive() { @@ -540,12 +525,15 @@ public class CommandAsyncService implements CommandAsyncExecutor { args.add(keys.size()); args.addAll(keys); args.addAll(Arrays.asList(params)); - async(false, nodeSource, codec, cmd, args.toArray(), promise, false); - + + RedisExecutor executor = new RedisExecutor<>(readOnlyMode, nodeSource, codec, cmd, + args.toArray(), promise, false, connectionManager, objectBuilder); + executor.execute(); + promise.onComplete((res, e) -> { if (e != null) { if (e.getMessage().startsWith("NOSCRIPT")) { - RFuture loadFuture = loadScript(keys, script); + RFuture loadFuture = loadScript(executor.getRedisClient(), script); loadFuture.onComplete((r, ex) -> { if (ex != null) { free(pps); @@ -559,7 +547,13 @@ public class CommandAsyncService implements CommandAsyncExecutor { newargs.add(keys.size()); newargs.addAll(keys); newargs.addAll(Arrays.asList(pps)); - async(false, nodeSource, codec, command, newargs.toArray(), mainPromise, false); + + NodeSource ns = nodeSource; + if (ns.getRedisClient() == null) { + ns = new NodeSource(nodeSource, executor.getRedisClient()); + } + + async(readOnlyMode, ns, codec, command, newargs.toArray(), mainPromise, false); }); } else { free(pps); diff --git a/redisson/src/main/java/org/redisson/command/RedisExecutor.java b/redisson/src/main/java/org/redisson/command/RedisExecutor.java index 45b3a4111..466a57d81 100644 --- a/redisson/src/main/java/org/redisson/command/RedisExecutor.java +++ b/redisson/src/main/java/org/redisson/command/RedisExecutor.java @@ -67,6 +67,7 @@ public class RedisExecutor { final RedissonObjectBuilder objectBuilder; final ConnectionManager connectionManager; + RFuture connectionFuture; NodeSource source; Codec codec; volatile int attempt; @@ -634,8 +635,11 @@ public class RedisExecutor { }); } + public RedisClient getRedisClient() { + return connectionFuture.getNow().getRedisClient(); + } + protected RFuture getConnection() { - RFuture connectionFuture; if (readOnlyMode) { connectionFuture = connectionManager.connectionReadOp(source, command); } else { diff --git a/redisson/src/main/java/org/redisson/connection/NodeSource.java b/redisson/src/main/java/org/redisson/connection/NodeSource.java index 616b34178..d0732e17c 100644 --- a/redisson/src/main/java/org/redisson/connection/NodeSource.java +++ b/redisson/src/main/java/org/redisson/connection/NodeSource.java @@ -33,6 +33,14 @@ public class NodeSource { private Redirect redirect; private MasterSlaveEntry entry; + public NodeSource(NodeSource nodeSource, RedisClient redisClient) { + this.slot = nodeSource.slot; + this.addr = nodeSource.addr; + this.redisClient = redisClient; + this.redirect = nodeSource.getRedirect(); + this.entry = nodeSource.getEntry(); + } + public NodeSource(MasterSlaveEntry entry) { this.entry = entry; } @@ -76,7 +84,7 @@ public class NodeSource { public RedisClient getRedisClient() { return redisClient; } - + public RedisURI getAddr() { return addr; } diff --git a/redisson/src/test/java/org/redisson/RedissonTest.java b/redisson/src/test/java/org/redisson/RedissonTest.java index 7107d0779..6f78b1d55 100644 --- a/redisson/src/test/java/org/redisson/RedissonTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTest.java @@ -876,6 +876,38 @@ public class RedissonTest { assertThat(c.toYAML()).isEqualTo(t); } + @Test + public void testEvalCache() throws InterruptedException, IOException { + RedisRunner master1 = new RedisRunner().port(6896).randomDir().nosave(); + RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave(); + RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave(); + RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave(); + RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave(); + RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave(); + + ClusterRunner clusterRunner = new ClusterRunner() + .addNode(master1, slave1) + .addNode(master2, slave2) + .addNode(master3, slave3); + ClusterRunner.ClusterProcesses process = clusterRunner.run(); + + Thread.sleep(5000); + + Config config = new Config(); + config.setUseScriptCache(true); + config.useClusterServers() + .setLoadBalancer(new RandomLoadBalancer()) + .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); + RedissonClient redisson = Redisson.create(config); + + RTimeSeries t = redisson.getTimeSeries("test"); + t.add(4, "40"); + t.add(2, "20"); + t.add(1, "10", 1, TimeUnit.SECONDS); + + t.size(); + } + @Test public void testMovedRedirectInCluster() throws Exception { RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();