Fixed - RBatch doesn't handle MOVED, ASK errors in Redis Cluster #3102

Fixed - Thread get stuck on RLock lock method during Redis cluster slots migration #2921
pull/3118/head
Nikita Koksharov 4 years ago
parent 11acea57de
commit e48908ce6b

@ -141,11 +141,11 @@ public class CommandBatchService extends CommandAsyncService {
if (isRedisBasedQueue()) {
boolean isReadOnly = options.getExecutionMode() == ExecutionMode.REDIS_READ_ATOMIC;
RedisExecutor<V, R> executor = new RedisQueuedBatchExecutor<>(isReadOnly, nodeSource, codec, command, params, mainPromise,
true, connectionManager, objectBuilder, commands, connections, options, index, executed, semaphore);
false, connectionManager, objectBuilder, commands, connections, options, index, executed, semaphore);
executor.execute();
} else {
RedisExecutor<V, R> executor = new RedisBatchExecutor<>(readOnlyMode, nodeSource, codec, command, params, mainPromise,
true, connectionManager, objectBuilder, commands, options, index, executed);
false, connectionManager, objectBuilder, commands, options, index, executed);
executor.execute();
}
@ -227,8 +227,21 @@ public class CommandBatchService extends CommandAsyncService {
RPromise<Void> voidPromise = new RedissonPromise<Void>();
if (this.options.isSkipResult()
&& this.options.getSyncSlaves() == 0) {
voidPromise.onComplete((res, e) -> {
voidPromise.onComplete((res, ex) -> {
executed.set(true);
if (ex != null) {
for (Entry e : commands.values()) {
e.getCommands().forEach(t -> t.tryFailure(ex));
}
promise.tryFailure(ex);
commands.clear();
nestedServices.clear();
return;
}
commands.clear();
nestedServices.clear();
promise.trySuccess(new BatchResult<>(Collections.emptyList(), 0));
@ -237,12 +250,12 @@ public class CommandBatchService extends CommandAsyncService {
voidPromise.onComplete((res, ex) -> {
executed.set(true);
if (ex != null) {
promise.tryFailure(ex);
for (Entry e : commands.values()) {
e.getCommands().forEach(t -> t.tryFailure(ex));
}
promise.tryFailure(ex);
commands.clear();
nestedServices.clear();
return;

@ -51,7 +51,7 @@ public class RedisCommonBatchExecutor extends RedisExecutor<Object, Void> {
public RedisCommonBatchExecutor(NodeSource source, RPromise<Void> mainPromise,
ConnectionManager connectionManager, BatchOptions options, Entry entry, AtomicInteger slots) {
super(entry.isReadOnlyMode(), source, null, null, null, mainPromise, true, connectionManager, null);
super(entry.isReadOnlyMode(), source, null, null, null, mainPromise, false, connectionManager, null);
this.options = options;
this.entry = entry;
this.slots = slots;

@ -13,19 +13,20 @@ import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.api.*;
import org.redisson.api.BatchOptions.ExecutionMode;
import org.redisson.api.RScript.Mode;
import org.redisson.client.RedisException;
import org.redisson.client.*;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.cluster.ClusterNodeInfo;
import org.redisson.config.Config;
import org.redisson.config.SubscriptionMode;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
@ -53,23 +54,92 @@ public class RedissonBatchTest extends BaseTest {
batchOptions = BatchOptions.defaults().executionMode(ExecutionMode.REDIS_WRITE_ATOMIC);
}
}
// @Test
public void testBatchRedirect() {
@Test
public void testSlotMigrationInCluster() throws Exception {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slot1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slot2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slot3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slot1)
.addNode(master2, slot2)
.addNode(master3, slot3);
ClusterRunner.ClusterProcesses process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setScanInterval(1000)
.setSubscriptionMode(SubscriptionMode.MASTER)
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RedisClientConfig cfg = new RedisClientConfig();
cfg.setAddress(process.getNodes().iterator().next().getRedisServerAddressAndPort());
RedisClient c = RedisClient.create(cfg);
RedisConnection cc = c.connect();
List<ClusterNodeInfo> mastersList = cc.sync(RedisCommands.CLUSTER_NODES);
mastersList = mastersList.stream().filter(i -> i.containsFlag(ClusterNodeInfo.Flag.MASTER)).collect(Collectors.toList());
c.shutdown();
ClusterNodeInfo destination = mastersList.stream().filter(i -> i.getSlotRanges().iterator().next().getStartSlot() != 10922).findAny().get();
ClusterNodeInfo source = mastersList.stream().filter(i -> i.getSlotRanges().iterator().next().getStartSlot() == 10922).findAny().get();
RedisClientConfig sourceCfg = new RedisClientConfig();
sourceCfg.setAddress(source.getAddress());
RedisClient sourceClient = RedisClient.create(sourceCfg);
RedisConnection sourceConnection = sourceClient.connect();
RedisClientConfig destinationCfg = new RedisClientConfig();
destinationCfg.setAddress(destination.getAddress());
RedisClient destinationClient = RedisClient.create(destinationCfg);
RedisConnection destinationConnection = destinationClient.connect();
String lockName = "test{kaO}";
RBatch batch = redisson.createBatch(batchOptions);
List<RFuture<Boolean>> futures = new ArrayList<>();
for (int i = 0; i < 5; i++) {
batch.getMap("" + i).fastPutAsync("" + i, i);
RFuture<Boolean> f = batch.getMap(lockName).fastPutAsync("" + i, i);
futures.add(f);
}
batch.execute();
batch = redisson.createBatch(batchOptions);
for (int i = 0; i < 1; i++) {
batch.getMap("" + i).sizeAsync();
batch.getMap("" + i).containsValueAsync("" + i);
batch.getMap("" + i).containsValueAsync(i);
destinationConnection.sync(RedisCommands.CLUSTER_SETSLOT, source.getSlotRanges().iterator().next().getStartSlot(), "IMPORTING", source.getNodeId());
sourceConnection.sync(RedisCommands.CLUSTER_SETSLOT, source.getSlotRanges().iterator().next().getStartSlot(), "MIGRATING", destination.getNodeId());
List<String> keys = sourceConnection.sync(RedisCommands.CLUSTER_GETKEYSINSLOT, source.getSlotRanges().iterator().next().getStartSlot(), 100);
List<Object> params = new ArrayList<Object>();
params.add(destination.getAddress().getHost());
params.add(destination.getAddress().getPort());
params.add("");
params.add(0);
params.add(2000);
params.add("KEYS");
params.addAll(keys);
sourceConnection.async(RedisCommands.MIGRATE, params.toArray());
for (ClusterNodeInfo node : mastersList) {
RedisClientConfig cc1 = new RedisClientConfig();
cc1.setAddress(node.getAddress());
RedisClient ccc = RedisClient.create(cc1);
RedisConnection connection = ccc.connect();
connection.sync(RedisCommands.CLUSTER_SETSLOT, source.getSlotRanges().iterator().next().getStartSlot(), "NODE", destination.getNodeId());
ccc.shutdownAsync();
}
BatchResult<?> t = batch.execute();
System.out.println(t);
Thread.sleep(2000);
batch.execute();
futures.forEach(f -> assertThat(f.awaitUninterruptibly(1)).isTrue());
sourceClient.shutdown();
destinationClient.shutdown();
redisson.shutdown();
process.shutdown();
}
@Test

Loading…
Cancel
Save