Fixed - all cached Lua scripts are executed on Redis master nodes only #2973

pull/2979/head
Nikita Koksharov 5 years ago
parent be8d293c87
commit b81720e3c5

@ -465,27 +465,12 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return mainPromise; return mainPromise;
} }
private RFuture<String> loadScript(List<Object> keys, String script) { private RFuture<String> loadScript(RedisClient client, String script) {
if (!keys.isEmpty()) { MasterSlaveEntry entry = getConnectionManager().getEntry(client);
Object key = keys.get(0); if (entry.getClient().equals(client)) {
if (key instanceof byte[]) { return writeAsync(entry, StringCodec.INSTANCE, RedisCommands.SCRIPT_LOAD, script);
return writeAsync((byte[]) key, StringCodec.INSTANCE, RedisCommands.SCRIPT_LOAD, script);
}
return writeAsync((String) key, StringCodec.INSTANCE, RedisCommands.SCRIPT_LOAD, script);
} }
return readAsync(client, StringCodec.INSTANCE, RedisCommands.SCRIPT_LOAD, script);
return writeAllAsync(RedisCommands.SCRIPT_LOAD, new SlotCallback<String, String>() {
volatile String result;
@Override
public void onSlotResult(String result) {
this.result = result;
}
@Override
public String onFinish() {
return result;
}
}, script);
} }
protected boolean isEvalCacheActive() { protected boolean isEvalCacheActive() {
@ -540,12 +525,15 @@ public class CommandAsyncService implements CommandAsyncExecutor {
args.add(keys.size()); args.add(keys.size());
args.addAll(keys); args.addAll(keys);
args.addAll(Arrays.asList(params)); args.addAll(Arrays.asList(params));
async(false, nodeSource, codec, cmd, args.toArray(), promise, false);
RedisExecutor<T, R> executor = new RedisExecutor<>(readOnlyMode, nodeSource, codec, cmd,
args.toArray(), promise, false, connectionManager, objectBuilder);
executor.execute();
promise.onComplete((res, e) -> { promise.onComplete((res, e) -> {
if (e != null) { if (e != null) {
if (e.getMessage().startsWith("NOSCRIPT")) { if (e.getMessage().startsWith("NOSCRIPT")) {
RFuture<String> loadFuture = loadScript(keys, script); RFuture<String> loadFuture = loadScript(executor.getRedisClient(), script);
loadFuture.onComplete((r, ex) -> { loadFuture.onComplete((r, ex) -> {
if (ex != null) { if (ex != null) {
free(pps); free(pps);
@ -559,7 +547,13 @@ public class CommandAsyncService implements CommandAsyncExecutor {
newargs.add(keys.size()); newargs.add(keys.size());
newargs.addAll(keys); newargs.addAll(keys);
newargs.addAll(Arrays.asList(pps)); newargs.addAll(Arrays.asList(pps));
async(false, nodeSource, codec, command, newargs.toArray(), mainPromise, false);
NodeSource ns = nodeSource;
if (ns.getRedisClient() == null) {
ns = new NodeSource(nodeSource, executor.getRedisClient());
}
async(readOnlyMode, ns, codec, command, newargs.toArray(), mainPromise, false);
}); });
} else { } else {
free(pps); free(pps);

@ -67,6 +67,7 @@ public class RedisExecutor<V, R> {
final RedissonObjectBuilder objectBuilder; final RedissonObjectBuilder objectBuilder;
final ConnectionManager connectionManager; final ConnectionManager connectionManager;
RFuture<RedisConnection> connectionFuture;
NodeSource source; NodeSource source;
Codec codec; Codec codec;
volatile int attempt; volatile int attempt;
@ -634,8 +635,11 @@ public class RedisExecutor<V, R> {
}); });
} }
public RedisClient getRedisClient() {
return connectionFuture.getNow().getRedisClient();
}
protected RFuture<RedisConnection> getConnection() { protected RFuture<RedisConnection> getConnection() {
RFuture<RedisConnection> connectionFuture;
if (readOnlyMode) { if (readOnlyMode) {
connectionFuture = connectionManager.connectionReadOp(source, command); connectionFuture = connectionManager.connectionReadOp(source, command);
} else { } else {

@ -33,6 +33,14 @@ public class NodeSource {
private Redirect redirect; private Redirect redirect;
private MasterSlaveEntry entry; private MasterSlaveEntry entry;
public NodeSource(NodeSource nodeSource, RedisClient redisClient) {
this.slot = nodeSource.slot;
this.addr = nodeSource.addr;
this.redisClient = redisClient;
this.redirect = nodeSource.getRedirect();
this.entry = nodeSource.getEntry();
}
public NodeSource(MasterSlaveEntry entry) { public NodeSource(MasterSlaveEntry entry) {
this.entry = entry; this.entry = entry;
} }
@ -76,7 +84,7 @@ public class NodeSource {
public RedisClient getRedisClient() { public RedisClient getRedisClient() {
return redisClient; return redisClient;
} }
public RedisURI getAddr() { public RedisURI getAddr() {
return addr; return addr;
} }

@ -876,6 +876,38 @@ public class RedissonTest {
assertThat(c.toYAML()).isEqualTo(t); assertThat(c.toYAML()).isEqualTo(t);
} }
@Test
public void testEvalCache() throws InterruptedException, IOException {
RedisRunner master1 = new RedisRunner().port(6896).randomDir().nosave();
RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave();
RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave();
RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave();
RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave();
RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterRunner.ClusterProcesses process = clusterRunner.run();
Thread.sleep(5000);
Config config = new Config();
config.setUseScriptCache(true);
config.useClusterServers()
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RTimeSeries<String> t = redisson.getTimeSeries("test");
t.add(4, "40");
t.add(2, "20");
t.add(1, "10", 1, TimeUnit.SECONDS);
t.size();
}
@Test @Test
public void testMovedRedirectInCluster() throws Exception { public void testMovedRedirectInCluster() throws Exception {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();

Loading…
Cancel
Save