From 2b2aace839189908295d459d93b8fbb520285aea Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 26 Apr 2018 15:56:29 +0300 Subject: [PATCH] testMovedRedirectInCluster added. #1397 --- .../redisson/connection/MasterSlaveEntry.java | 5 + .../test/java/org/redisson/RedissonTest.java | 99 +++++++++++++++++++ 2 files changed, 104 insertions(+) diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index a9be96df1..304294bec 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -547,4 +547,9 @@ public class MasterSlaveEntry { return slots; } + @Override + public String toString() { + return "MasterSlaveEntry [masterEntry=" + masterEntry + "]"; + } + } diff --git a/redisson/src/test/java/org/redisson/RedissonTest.java b/redisson/src/test/java/org/redisson/RedissonTest.java index 59f2d27c4..66588df3c 100644 --- a/redisson/src/test/java/org/redisson/RedissonTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTest.java @@ -19,6 +19,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.junit.After; import org.junit.AfterClass; @@ -27,6 +28,7 @@ import org.junit.Assume; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.redisson.ClusterRunner.ClusterProcesses; import org.redisson.RedisRunner.RedisProcess; import org.redisson.api.ClusterNode; import org.redisson.api.Node; @@ -36,15 +38,22 @@ import org.redisson.api.RFuture; import org.redisson.api.RMap; import org.redisson.api.RedissonClient; import org.redisson.client.RedisClient; +import org.redisson.client.RedisClientConfig; +import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisOutOfMemoryException; import org.redisson.client.codec.StringCodec; +import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ScanObjectEntry; +import org.redisson.cluster.ClusterNodeInfo; +import org.redisson.cluster.ClusterNodeInfo.Flag; import org.redisson.codec.JsonJacksonCodec; import org.redisson.codec.SerializationCodec; import org.redisson.config.Config; +import org.redisson.connection.CRC16; import org.redisson.connection.ConnectionListener; +import org.redisson.connection.MasterSlaveConnectionManager; import org.redisson.connection.balancer.RandomLoadBalancer; import org.redisson.misc.HashValue; @@ -630,6 +639,96 @@ public class RedissonTest { Assert.assertTrue(nodes.pingAll()); } + + @Test + public void testMovedRedirectInCluster() throws Exception { + RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slot1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slot2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slot3 = new RedisRunner().randomPort().randomDir().nosave(); + + ClusterRunner clusterRunner = new ClusterRunner() + .addNode(master1, slot1) + .addNode(master2, slot2) + .addNode(master3, slot3); + ClusterProcesses process = clusterRunner.run(); + + Config config = new Config(); + config.useClusterServers() + .setScanInterval(100000) + .setLoadBalancer(new RandomLoadBalancer()) + .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); + RedissonClient redisson = Redisson.create(config); + + RedisClientConfig cfg = new RedisClientConfig(); + cfg.setAddress(process.getNodes().iterator().next().getRedisServerAddressAndPort()); + RedisClient c = RedisClient.create(cfg); + RedisConnection cc = c.connect(); + List cn = cc.sync(RedisCommands.CLUSTER_NODES); + cn = cn.stream().filter(i -> i.containsFlag(Flag.MASTER)).collect(Collectors.toList()); + Iterator nodesIter = cn.iterator(); + + + ClusterNodeInfo source = nodesIter.next(); + ClusterNodeInfo destination = nodesIter.next(); + + RedisClientConfig sourceCfg = new RedisClientConfig(); + sourceCfg.setAddress(source.getAddress()); + RedisClient sourceClient = RedisClient.create(sourceCfg); + RedisConnection sourceConnection = sourceClient.connect(); + + RedisClientConfig destinationCfg = new RedisClientConfig(); + destinationCfg.setAddress(destination.getAddress()); + RedisClient destinationClient = RedisClient.create(destinationCfg); + RedisConnection destinationConnection = destinationClient.connect(); + + String key = null; + int slot = 0; + for (int i = 0; i < 100000; i++) { + key = "" + i; + slot = CRC16.crc16(key.getBytes()) % MasterSlaveConnectionManager.MAX_SLOT; + if (source.getSlotRanges().iterator().next().getStartSlot() == slot) { + break; + } + } + + redisson.getBucket(key).set("123"); + + destinationConnection.sync(RedisCommands.CLUSTER_SETSLOT, source.getSlotRanges().iterator().next().getStartSlot(), "IMPORTING", source.getNodeId()); + sourceConnection.sync(RedisCommands.CLUSTER_SETSLOT, source.getSlotRanges().iterator().next().getStartSlot(), "MIGRATING", destination.getNodeId()); + + List keys = sourceConnection.sync(RedisCommands.CLUSTER_GETKEYSINSLOT, source.getSlotRanges().iterator().next().getStartSlot(), 100); + List params = new ArrayList(); + params.add(destination.getAddress().getHost()); + params.add(destination.getAddress().getPort()); + params.add(""); + params.add(0); + params.add(2000); + params.add("KEYS"); + params.addAll(keys); + sourceConnection.async(RedisCommands.MIGRATE, params.toArray()); + + for (ClusterNodeInfo node : cn) { + RedisClientConfig cc1 = new RedisClientConfig(); + cc1.setAddress(node.getAddress()); + RedisClient ccc = RedisClient.create(cc1); + RedisConnection connection = ccc.connect(); + connection.sync(RedisCommands.CLUSTER_SETSLOT, slot, "NODE", destination.getNodeId()); + } + + redisson.getBucket(key).set("123"); + + List nodes = cc.sync(RedisCommands.CLUSTER_NODES); + for (ClusterNodeInfo clusterNodeInfo : nodes) { + System.out.println(clusterNodeInfo); + } + + redisson.shutdown(); + process.shutdown(); + } + @Test(expected = RedisConnectionException.class) public void testSingleConnectionFail() throws InterruptedException {