Merge pull request #856 from jackygurui/cluster-runner-extras

Cluster runner extras
pull/860/head
Nikita Koksharov 8 years ago committed by GitHub
commit 322e24823e

@ -9,6 +9,10 @@ import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.redisson.misc.BiHashMap;
/**
*
@ -17,7 +21,7 @@ import java.util.List;
public class ClusterRunner {
private final LinkedHashMap<RedisRunner, String> nodes = new LinkedHashMap<>();
private final LinkedHashMap<String, String> masters = new LinkedHashMap<>();
private final LinkedHashMap<String, String> slaveMasters = new LinkedHashMap<>();
public ClusterRunner addNode(RedisRunner runner) {
nodes.putIfAbsent(runner, getRandomId());
@ -41,16 +45,16 @@ public class ClusterRunner {
addNode(master);
for (RedisRunner slave : slaves) {
addNode(slave);
masters.put(nodes.get(slave), nodes.get(master));
slaveMasters.put(nodes.get(slave), nodes.get(master));
}
return this;
}
public List<RedisRunner.RedisProcess> run() throws IOException, InterruptedException, RedisRunner.FailedToStartRedisException {
ArrayList<RedisRunner.RedisProcess> processes = new ArrayList<>();
public synchronized ClusterProcesses run() throws IOException, InterruptedException, RedisRunner.FailedToStartRedisException {
BiHashMap<String, RedisRunner.RedisProcess> processes = new BiHashMap<>();
for (RedisRunner runner : nodes.keySet()) {
List<String> options = getClusterConfig(runner);
String confFile = runner.dir() + File.separatorChar + nodes.get(runner) + ".conf";
String confFile = runner.dir() + File.separator + nodes.get(runner) + ".conf";
System.out.println("WRITING CONFIG: for " + nodes.get(runner));
try (PrintWriter printer = new PrintWriter(new FileWriter(confFile))) {
options.stream().forEach((line) -> {
@ -58,22 +62,21 @@ public class ClusterRunner {
System.out.println(line);
});
}
processes.add(runner.clusterConfigFile(confFile).run());
processes.put(nodes.get(runner), runner.clusterConfigFile(confFile).run());
}
Thread.sleep(1000);
for (RedisRunner.RedisProcess process : processes) {
for (RedisRunner.RedisProcess process : processes.valueSet()) {
if (!process.isAlive()) {
throw new RedisRunner.FailedToStartRedisException();
}
}
return processes;
return new ClusterProcesses(processes);
}
private List<String> getClusterConfig(RedisRunner runner) {
String me = runner.getInitialBindAddr() + ":" + runner.getPort();
List<String> nodeConfig = new ArrayList<>();
int c = 1;
int master = 0;
int c = 0;
for (RedisRunner node : nodes.keySet()) {
String nodeId = nodes.get(node);
StringBuilder sb = new StringBuilder();
@ -87,36 +90,67 @@ public class ClusterRunner {
if (isMaster) {
sb.append("master -");
} else {
sb.append("slave ").append(masters.get(nodeId));
sb.append("slave ").append(slaveMasters.get(nodeId));
}
sb.append(" ");
sb.append("0").append(" ");
sb.append(me.equals(nodeAddr)
? "0"
: "1").append(" ");
sb.append(c).append(" ");
sb.append(c + 1).append(" ");
sb.append("connected ");
if (isMaster) {
sb.append(getSlots(master, masters.size()));
master++;
sb.append(getSlots(c, nodes.size() - slaveMasters.size()));
c++;
}
c++;
nodeConfig.add(sb.toString());
}
nodeConfig.add("vars currentEpoch 0 lastVoteEpoch 0");
return nodeConfig;
}
private String getSlots(int index, int groupNum) {
private static String getSlots(int index, int groupNum) {
final double t = 16383;
int start = index == 0 ? 0 : (int) (t / groupNum * index);
int end = index == groupNum - 1 ? (int) t : (int) (t / groupNum * (index + 1)) - 1;
return start + "-" + end;
}
private String getRandomId() {
private static String getRandomId() {
final SecureRandom r = new SecureRandom();
return new BigInteger(160, r).toString(16);
}
public static class ClusterProcesses {
private final BiHashMap<String, RedisRunner.RedisProcess> processes;
private ClusterProcesses(BiHashMap<String, RedisRunner.RedisProcess> processes) {
this.processes = processes;
}
public RedisRunner.RedisProcess getProcess(String nodeId) {
return processes.get(nodeId);
}
public String getNodeId(RedisRunner.RedisProcess process) {
return processes.reverseGet(process);
}
public Set<RedisRunner.RedisProcess> getNodes() {
return processes.valueSet();
}
public Set<String> getNodeIds() {
return processes.keySet();
}
public synchronized Map<String, Integer> shutdown() {
return processes
.entrySet()
.stream()
.collect(Collectors.toMap(
e -> e.getKey(),
e -> e.getValue().stop()));
}
}
}

@ -272,7 +272,7 @@ public class RedisRunner {
public RedisProcess runAndCheck() throws IOException, InterruptedException, FailedToStartRedisException {
List<String> args = new ArrayList(options.values());
if (sentinelFile != null && sentinelFile.length() > 0) {
String confFile = defaultDir + File.pathSeparator + sentinelFile;
String confFile = defaultDir + File.separator + sentinelFile;
try (PrintWriter printer = new PrintWriter(new FileWriter(confFile))) {
args.stream().forEach((arg) -> {
if (arg.contains("--")) {
@ -290,10 +290,7 @@ public class RedisRunner {
throw new FailedToStartRedisException();
}
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
rp.stop();
} catch (InterruptedException ex) {
}
rp.stop();
}));
return rp;
}
@ -828,7 +825,7 @@ public class RedisRunner {
}
public String dir() {
return this.path;
return isRandomDir() ? defaultDir() : this.path;
}
public String getInitialBindAddr() {
@ -845,7 +842,7 @@ public class RedisRunner {
}
public boolean deleteSentinelFile() {
File f = new File(defaultDir + File.pathSeparator + sentinelFile);
File f = new File(defaultDir + File.separator + sentinelFile);
if (f.exists()) {
System.out.println("REDIS RUNNER: Deleting sentinel config file " + f.getAbsolutePath());
return f.delete();
@ -863,7 +860,7 @@ public class RedisRunner {
}
private void makeRandomDefaultDir() {
File f = new File(RedissonRuntimeEnvironment.tempDir + File.pathSeparator + UUID.randomUUID());
File f = new File(RedissonRuntimeEnvironment.tempDir + File.separator + UUID.randomUUID());
if (f.exists()) {
makeRandomDefaultDir();
} else {
@ -884,17 +881,36 @@ public class RedisRunner {
this.runner = runner;
}
public int stop() throws InterruptedException {
public int stop() {
if (runner.isNosave() && !runner.isRandomDir()) {
RedisClient c = createDefaultRedisClientInstance();
RedisConnection connection = c.connect();
connection.async(new RedisStrictCommand<Void>("SHUTDOWN", "NOSAVE", new VoidReplayConvertor()))
.await(3, TimeUnit.SECONDS);
try {
connection.async(new RedisStrictCommand<Void>("SHUTDOWN", "NOSAVE", new VoidReplayConvertor()))
.await(3, TimeUnit.SECONDS);
} catch (InterruptedException interruptedException) {
//shutdown via command failed, lets wait and kill it later.
}
c.shutdown();
connection.closeAsync().syncUninterruptibly();
}
redisProcess.destroy();
int exitCode = redisProcess.isAlive() ? redisProcess.waitFor() : redisProcess.exitValue();
Process p = redisProcess;
p.destroy();
boolean normalTermination = false;
try {
normalTermination = p.waitFor(5, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
//OK lets hurry up by force kill;
}
if (!normalTermination) {
p = p.destroyForcibly();
}
cleanup();
int exitCode = p.exitValue();
return exitCode == 1 && RedissonRuntimeEnvironment.isWindows ? 0 : exitCode;
}
private void cleanup() {
if (runner.isSentinel()) {
runner.deleteSentinelFile();
}
@ -904,9 +920,8 @@ public class RedisRunner {
if (runner.isRandomDir()) {
runner.deleteDBfileDir();
}
return exitCode == 1 && RedissonRuntimeEnvironment.isWindows ? 0 : exitCode;
}
public Process getRedisProcess() {
return redisProcess;
}

@ -1,6 +1,8 @@
package org.redisson.spring.support;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.After;
import org.junit.AfterClass;
import static org.junit.Assert.*;
@ -124,14 +126,24 @@ public class SpringNamespaceTest extends BaseTest {
RedisRunner.getDefaultRedisServerInstance().getRedisServerPort(),
2).run();
System.setProperty("sentinel3Address", sentinel3.getRedisServerAddressAndPort());
RedisRunner slave = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(new RedisRunner().randomPort().randomDir().nosave())
.addNode(new RedisRunner().randomPort().randomDir().nosave())
.addNode(new RedisRunner().randomPort().randomDir().nosave());
List<RedisRunner.RedisProcess> nodes = clusterRunner.run();
nodes.stream().forEach((node) -> {
System.setProperty("node" + (nodes.indexOf(node) + 1) + "Address", node.getRedisServerAddressAndPort());
.addNode(new RedisRunner().randomPort().randomDir().nosave(),//master1
new RedisRunner().randomPort().randomDir().nosave(),//slave1-1
new RedisRunner().randomPort().randomDir().nosave(),//slave1-2
slave)//slave1-3
.addNode(new RedisRunner().randomPort().randomDir().nosave(),//master2
new RedisRunner().randomPort().randomDir().nosave(),//slave2-1
new RedisRunner().randomPort().randomDir().nosave())//slave2-2
.addNode(new RedisRunner().randomPort().randomDir().nosave(),//master3
new RedisRunner().randomPort().randomDir().nosave(),//slave3-1
new RedisRunner().randomPort().randomDir().nosave())//slave3-2
.addNode(slave,//slave1-3
new RedisRunner().randomPort().randomDir().nosave(),//slave1-3-1
new RedisRunner().randomPort().randomDir().nosave());//slave1-3-2
final AtomicLong index = new AtomicLong(0);
clusterRunner.run().getNodes().stream().forEach((node) -> {
System.setProperty("node" + (index.incrementAndGet()) + "Address", node.getRedisServerAddressAndPort());
});
context = new ClassPathXmlApplicationContext("classpath:org/redisson/spring/support/namespace.xml");

@ -180,16 +180,14 @@ public class SpringNamespaceWikiTest {
.port(6381)
.randomDir()
.nosave());
List<RedisRunner.RedisProcess> nodes = clusterRunner.run();
ClusterRunner.ClusterProcesses cluster = clusterRunner.run();
try {
((ConfigurableApplicationContext)
new ClassPathXmlApplicationContext("classpath:org/redisson/spring/support/namespace_wiki_cluster.xml"))
.close();
} finally {
for (RedisRunner.RedisProcess node : nodes) {
node.stop();
}
cluster.shutdown();
}
}
}

Loading…
Cancel
Save