testMovedRedirectInCluster added. #1397

pull/1423/head
Nikita 7 years ago
parent 4e2e95d4a7
commit 2b2aace839

@ -547,4 +547,9 @@ public class MasterSlaveEntry {
return slots;
}
@Override
public String toString() {
return "MasterSlaveEntry [masterEntry=" + masterEntry + "]";
}
}

@ -19,6 +19,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.junit.After;
import org.junit.AfterClass;
@ -27,6 +28,7 @@ import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner.RedisProcess;
import org.redisson.api.ClusterNode;
import org.redisson.api.Node;
@ -36,15 +38,22 @@ import org.redisson.api.RFuture;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisClientConfig;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisOutOfMemoryException;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.cluster.ClusterNodeInfo;
import org.redisson.cluster.ClusterNodeInfo.Flag;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.codec.SerializationCodec;
import org.redisson.config.Config;
import org.redisson.connection.CRC16;
import org.redisson.connection.ConnectionListener;
import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.redisson.misc.HashValue;
@ -630,6 +639,96 @@ public class RedissonTest {
Assert.assertTrue(nodes.pingAll());
}
@Test
public void testMovedRedirectInCluster() throws Exception {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slot1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slot2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slot3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slot1)
.addNode(master2, slot2)
.addNode(master3, slot3);
ClusterProcesses process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setScanInterval(100000)
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RedisClientConfig cfg = new RedisClientConfig();
cfg.setAddress(process.getNodes().iterator().next().getRedisServerAddressAndPort());
RedisClient c = RedisClient.create(cfg);
RedisConnection cc = c.connect();
List<ClusterNodeInfo> cn = cc.sync(RedisCommands.CLUSTER_NODES);
cn = cn.stream().filter(i -> i.containsFlag(Flag.MASTER)).collect(Collectors.toList());
Iterator<ClusterNodeInfo> nodesIter = cn.iterator();
ClusterNodeInfo source = nodesIter.next();
ClusterNodeInfo destination = nodesIter.next();
RedisClientConfig sourceCfg = new RedisClientConfig();
sourceCfg.setAddress(source.getAddress());
RedisClient sourceClient = RedisClient.create(sourceCfg);
RedisConnection sourceConnection = sourceClient.connect();
RedisClientConfig destinationCfg = new RedisClientConfig();
destinationCfg.setAddress(destination.getAddress());
RedisClient destinationClient = RedisClient.create(destinationCfg);
RedisConnection destinationConnection = destinationClient.connect();
String key = null;
int slot = 0;
for (int i = 0; i < 100000; i++) {
key = "" + i;
slot = CRC16.crc16(key.getBytes()) % MasterSlaveConnectionManager.MAX_SLOT;
if (source.getSlotRanges().iterator().next().getStartSlot() == slot) {
break;
}
}
redisson.getBucket(key).set("123");
destinationConnection.sync(RedisCommands.CLUSTER_SETSLOT, source.getSlotRanges().iterator().next().getStartSlot(), "IMPORTING", source.getNodeId());
sourceConnection.sync(RedisCommands.CLUSTER_SETSLOT, source.getSlotRanges().iterator().next().getStartSlot(), "MIGRATING", destination.getNodeId());
List<String> keys = sourceConnection.sync(RedisCommands.CLUSTER_GETKEYSINSLOT, source.getSlotRanges().iterator().next().getStartSlot(), 100);
List<Object> params = new ArrayList<Object>();
params.add(destination.getAddress().getHost());
params.add(destination.getAddress().getPort());
params.add("");
params.add(0);
params.add(2000);
params.add("KEYS");
params.addAll(keys);
sourceConnection.async(RedisCommands.MIGRATE, params.toArray());
for (ClusterNodeInfo node : cn) {
RedisClientConfig cc1 = new RedisClientConfig();
cc1.setAddress(node.getAddress());
RedisClient ccc = RedisClient.create(cc1);
RedisConnection connection = ccc.connect();
connection.sync(RedisCommands.CLUSTER_SETSLOT, slot, "NODE", destination.getNodeId());
}
redisson.getBucket(key).set("123");
List<ClusterNodeInfo> nodes = cc.sync(RedisCommands.CLUSTER_NODES);
for (ClusterNodeInfo clusterNodeInfo : nodes) {
System.out.println(clusterNodeInfo);
}
redisson.shutdown();
process.shutdown();
}
@Test(expected = RedisConnectionException.class)
public void testSingleConnectionFail() throws InterruptedException {

Loading…
Cancel
Save