From e48908ce6b307c313c86dd7447de845e1fddab09 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Tue, 6 Oct 2020 14:34:31 +0300 Subject: [PATCH] Fixed - RBatch doesn't handle MOVED, ASK errors in Redis Cluster #3102 Fixed - Thread get stuck on RLock lock method during Redis cluster slots migration #2921 --- .../redisson/command/CommandBatchService.java | 23 +++- .../command/RedisCommonBatchExecutor.java | 2 +- .../java/org/redisson/RedissonBatchTest.java | 104 +++++++++++++++--- 3 files changed, 106 insertions(+), 23 deletions(-) diff --git a/redisson/src/main/java/org/redisson/command/CommandBatchService.java b/redisson/src/main/java/org/redisson/command/CommandBatchService.java index 747ce7897..79c70071e 100644 --- a/redisson/src/main/java/org/redisson/command/CommandBatchService.java +++ b/redisson/src/main/java/org/redisson/command/CommandBatchService.java @@ -141,11 +141,11 @@ public class CommandBatchService extends CommandAsyncService { if (isRedisBasedQueue()) { boolean isReadOnly = options.getExecutionMode() == ExecutionMode.REDIS_READ_ATOMIC; RedisExecutor executor = new RedisQueuedBatchExecutor<>(isReadOnly, nodeSource, codec, command, params, mainPromise, - true, connectionManager, objectBuilder, commands, connections, options, index, executed, semaphore); + false, connectionManager, objectBuilder, commands, connections, options, index, executed, semaphore); executor.execute(); } else { RedisExecutor executor = new RedisBatchExecutor<>(readOnlyMode, nodeSource, codec, command, params, mainPromise, - true, connectionManager, objectBuilder, commands, options, index, executed); + false, connectionManager, objectBuilder, commands, options, index, executed); executor.execute(); } @@ -227,8 +227,21 @@ public class CommandBatchService extends CommandAsyncService { RPromise voidPromise = new RedissonPromise(); if (this.options.isSkipResult() && this.options.getSyncSlaves() == 0) { - voidPromise.onComplete((res, e) -> { + voidPromise.onComplete((res, ex) -> { executed.set(true); + + if (ex != null) { + for (Entry e : commands.values()) { + e.getCommands().forEach(t -> t.tryFailure(ex)); + } + + promise.tryFailure(ex); + + commands.clear(); + nestedServices.clear(); + return; + } + commands.clear(); nestedServices.clear(); promise.trySuccess(new BatchResult<>(Collections.emptyList(), 0)); @@ -237,12 +250,12 @@ public class CommandBatchService extends CommandAsyncService { voidPromise.onComplete((res, ex) -> { executed.set(true); if (ex != null) { - promise.tryFailure(ex); - for (Entry e : commands.values()) { e.getCommands().forEach(t -> t.tryFailure(ex)); } + promise.tryFailure(ex); + commands.clear(); nestedServices.clear(); return; diff --git a/redisson/src/main/java/org/redisson/command/RedisCommonBatchExecutor.java b/redisson/src/main/java/org/redisson/command/RedisCommonBatchExecutor.java index 1cd5e5aec..306200d64 100644 --- a/redisson/src/main/java/org/redisson/command/RedisCommonBatchExecutor.java +++ b/redisson/src/main/java/org/redisson/command/RedisCommonBatchExecutor.java @@ -51,7 +51,7 @@ public class RedisCommonBatchExecutor extends RedisExecutor { public RedisCommonBatchExecutor(NodeSource source, RPromise mainPromise, ConnectionManager connectionManager, BatchOptions options, Entry entry, AtomicInteger slots) { - super(entry.isReadOnlyMode(), source, null, null, null, mainPromise, true, connectionManager, null); + super(entry.isReadOnlyMode(), source, null, null, null, mainPromise, false, connectionManager, null); this.options = options; this.entry = entry; this.slots = slots; diff --git a/redisson/src/test/java/org/redisson/RedissonBatchTest.java b/redisson/src/test/java/org/redisson/RedissonBatchTest.java index fd10db292..d8eace9f2 100644 --- a/redisson/src/test/java/org/redisson/RedissonBatchTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBatchTest.java @@ -13,19 +13,20 @@ import org.redisson.RedisRunner.FailedToStartRedisException; import org.redisson.api.*; import org.redisson.api.BatchOptions.ExecutionMode; import org.redisson.api.RScript.Mode; -import org.redisson.client.RedisException; +import org.redisson.client.*; import org.redisson.client.codec.StringCodec; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.cluster.ClusterNodeInfo; import org.redisson.config.Config; +import org.redisson.config.SubscriptionMode; import java.io.IOException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; @@ -53,23 +54,92 @@ public class RedissonBatchTest extends BaseTest { batchOptions = BatchOptions.defaults().executionMode(ExecutionMode.REDIS_WRITE_ATOMIC); } } - -// @Test - public void testBatchRedirect() { + + @Test + public void testSlotMigrationInCluster() throws Exception { + RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slot1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slot2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slot3 = new RedisRunner().randomPort().randomDir().nosave(); + + ClusterRunner clusterRunner = new ClusterRunner() + .addNode(master1, slot1) + .addNode(master2, slot2) + .addNode(master3, slot3); + ClusterRunner.ClusterProcesses process = clusterRunner.run(); + + Config config = new Config(); + config.useClusterServers() + .setScanInterval(1000) + .setSubscriptionMode(SubscriptionMode.MASTER) + .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); + RedissonClient redisson = Redisson.create(config); + + RedisClientConfig cfg = new RedisClientConfig(); + cfg.setAddress(process.getNodes().iterator().next().getRedisServerAddressAndPort()); + RedisClient c = RedisClient.create(cfg); + RedisConnection cc = c.connect(); + List mastersList = cc.sync(RedisCommands.CLUSTER_NODES); + mastersList = mastersList.stream().filter(i -> i.containsFlag(ClusterNodeInfo.Flag.MASTER)).collect(Collectors.toList()); + c.shutdown(); + + ClusterNodeInfo destination = mastersList.stream().filter(i -> i.getSlotRanges().iterator().next().getStartSlot() != 10922).findAny().get(); + ClusterNodeInfo source = mastersList.stream().filter(i -> i.getSlotRanges().iterator().next().getStartSlot() == 10922).findAny().get(); + + RedisClientConfig sourceCfg = new RedisClientConfig(); + sourceCfg.setAddress(source.getAddress()); + RedisClient sourceClient = RedisClient.create(sourceCfg); + RedisConnection sourceConnection = sourceClient.connect(); + + RedisClientConfig destinationCfg = new RedisClientConfig(); + destinationCfg.setAddress(destination.getAddress()); + RedisClient destinationClient = RedisClient.create(destinationCfg); + RedisConnection destinationConnection = destinationClient.connect(); + + String lockName = "test{kaO}"; + RBatch batch = redisson.createBatch(batchOptions); + List> futures = new ArrayList<>(); for (int i = 0; i < 5; i++) { - batch.getMap("" + i).fastPutAsync("" + i, i); + RFuture f = batch.getMap(lockName).fastPutAsync("" + i, i); + futures.add(f); } - batch.execute(); - batch = redisson.createBatch(batchOptions); - for (int i = 0; i < 1; i++) { - batch.getMap("" + i).sizeAsync(); - batch.getMap("" + i).containsValueAsync("" + i); - batch.getMap("" + i).containsValueAsync(i); + destinationConnection.sync(RedisCommands.CLUSTER_SETSLOT, source.getSlotRanges().iterator().next().getStartSlot(), "IMPORTING", source.getNodeId()); + sourceConnection.sync(RedisCommands.CLUSTER_SETSLOT, source.getSlotRanges().iterator().next().getStartSlot(), "MIGRATING", destination.getNodeId()); + + List keys = sourceConnection.sync(RedisCommands.CLUSTER_GETKEYSINSLOT, source.getSlotRanges().iterator().next().getStartSlot(), 100); + List params = new ArrayList(); + params.add(destination.getAddress().getHost()); + params.add(destination.getAddress().getPort()); + params.add(""); + params.add(0); + params.add(2000); + params.add("KEYS"); + params.addAll(keys); + sourceConnection.async(RedisCommands.MIGRATE, params.toArray()); + + for (ClusterNodeInfo node : mastersList) { + RedisClientConfig cc1 = new RedisClientConfig(); + cc1.setAddress(node.getAddress()); + RedisClient ccc = RedisClient.create(cc1); + RedisConnection connection = ccc.connect(); + connection.sync(RedisCommands.CLUSTER_SETSLOT, source.getSlotRanges().iterator().next().getStartSlot(), "NODE", destination.getNodeId()); + ccc.shutdownAsync(); } - BatchResult t = batch.execute(); - System.out.println(t); + + Thread.sleep(2000); + + batch.execute(); + + futures.forEach(f -> assertThat(f.awaitUninterruptibly(1)).isTrue()); + + sourceClient.shutdown(); + destinationClient.shutdown(); + redisson.shutdown(); + process.shutdown(); } @Test