Merge remote-tracking branch 'origin/feature/travis-ci' into feature/travis-ci

pull/509/head
jackygurui 9 years ago
commit d1c90f1ee2

2
.gitignore vendored

@ -10,3 +10,5 @@
/*.cmd
nb-configuration.xml
nbactions-unit-test.xml

@ -9,7 +9,7 @@
<goal>test</goal>
</goals>
<properties>
<argLine>"-DredisBinary=/usr/local/bin/redis-server"</argLine>
<argLine>"-DredisBinary=/Applications/EAPManager.app/Contents/Resources/redis/redis-server"</argLine>
</properties>
</action>
<action>
@ -23,7 +23,7 @@
</goals>
<properties>
<test>${packageClassName}</test>
<argLine>"-DredisBinary=/usr/local/bin/redis-server"</argLine>
<argLine>"-DredisBinary=/Applications/EAPManager.app/Contents/Resources/redis/redis-server"</argLine>
</properties>
</action>
<action>
@ -39,13 +39,13 @@
<exec.args>-classpath %classpath ${packageClassName}</exec.args>
<exec.executable>java</exec.executable>
<exec.classpathScope>${classPathScope}</exec.classpathScope>
<argLine>"-DredisBinary=/usr/local/bin/redis-server"</argLine>
<argLine>"-DredisBinary=/Applications/EAPManager.app/Contents/Resources/redis/redis-server"</argLine>
</properties>
</action>
<action>
<actionName>debug</actionName>
<properties>
<argLine>"-DredisBinary=/usr/local/bin/redis-server"</argLine>
<argLine>"-DredisBinary=/Applications/EAPManager.app/Contents/Resources/redis/redis-server"</argLine>
</properties>
</action>
<action>
@ -62,7 +62,7 @@
<exec.executable>java</exec.executable>
<exec.classpathScope>${classPathScope}</exec.classpathScope>
<jpda.listen>true</jpda.listen>
<argLine>"-DredisBinary=/usr/local/bin/redis-server"</argLine>
<argLine>"-DredisBinary=/Applications/EAPManager.app/Contents/Resources/redis/redis-server"</argLine>
</properties>
</action>
<action>
@ -79,13 +79,13 @@
<forkMode>once</forkMode>
<maven.surefire.debug>-Xdebug -Xrunjdwp:transport=dt_socket,server=n,address=${jpda.address}</maven.surefire.debug>
<jpda.listen>true</jpda.listen>
<argLine>"-DredisBinary=/usr/local/bin/redis-server"</argLine>
<argLine>"-DredisBinary=/Applications/EAPManager.app/Contents/Resources/redis/redis-server"</argLine>
</properties>
</action>
<action>
<actionName>profile</actionName>
<properties>
<argLine>"-DredisBinary=/usr/local/bin/redis-server"</argLine>
<argLine>"-DredisBinary=/Applications/EAPManager.app/Contents/Resources/redis/redis-server"</argLine>
</properties>
</action>
<action>
@ -101,7 +101,7 @@
<exec.args>-classpath %classpath ${packageClassName}</exec.args>
<exec.executable>java</exec.executable>
<exec.classpathScope>${classPathScope}</exec.classpathScope>
<argLine>"-DredisBinary=/usr/local/bin/redis-server"</argLine>
<argLine>"-DredisBinary=/Applications/EAPManager.app/Contents/Resources/redis/redis-server"</argLine>
</properties>
</action>
</actions>

@ -2,76 +2,68 @@ package org.redisson;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.junit.Assert;
import org.redisson.client.RedisClient;
public abstract class BaseConcurrentTest extends BaseTest {
protected void testMultiInstanceConcurrency(int iterations, final RedissonRunnable runnable) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*2);
ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2);
final Map<Integer, RedissonClient> instances = new HashMap<>();
final Map<Integer, RedissonClient> instances = new HashMap<Integer, RedissonClient>();
for (int i = 0; i < iterations; i++) {
instances.put(i, BaseTest.createInstance());
}
pool.submit(() -> {
IntStream.range(0, iterations)
.parallel()
.forEach((i) -> instances.put(i, BaseTest.createInstance()));
});
long watch = System.currentTimeMillis();
for (int i = 0; i < iterations; i++) {
final int n = i;
executor.execute(new Runnable() {
@Override
public void run() {
RedissonClient redisson = instances.get(n);
runnable.run(redisson);
}
});
}
executor.shutdown();
Assert.assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES));
pool.awaitQuiescence(5, TimeUnit.MINUTES);
pool.submit(() -> {
IntStream.range(0, iterations)
.parallel()
.forEach((i) -> runnable.run(instances.get(i)));
});
pool.shutdown();
Assert.assertTrue(pool.awaitTermination(5, TimeUnit.MINUTES));
System.out.println("multi: " + (System.currentTimeMillis() - watch));
executor = Executors.newCachedThreadPool();
pool = new ForkJoinPool();
for (final RedissonClient redisson : instances.values()) {
executor.execute(new Runnable() {
@Override
public void run() {
redisson.shutdown();
}
});
}
pool.submit(() -> {
instances.values()
.parallelStream()
.<RedisClient>forEach((r) -> r.shutdown());
});
executor.shutdown();
Assert.assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES));
pool.shutdown();
Assert.assertTrue(pool.awaitTermination(5, TimeUnit.MINUTES));
}
protected void testSingleInstanceConcurrency(int iterations, final RedissonRunnable runnable) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*2);
final RedissonClient redisson = BaseTest.createInstance();
final RedissonClient r = BaseTest.createInstance();
long watch = System.currentTimeMillis();
for (int i = 0; i < iterations; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
runnable.run(redisson);
}
});
}
executor.shutdown();
Assert.assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES));
System.out.println(System.currentTimeMillis() - watch);
ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2);
pool.submit(() -> {
IntStream.range(0, iterations)
.parallel()
.forEach((i) -> runnable.run(r));
});
redisson.shutdown();
}
pool.shutdown();
Assert.assertTrue(pool.awaitTermination(5, TimeUnit.MINUTES));
System.out.println(System.currentTimeMillis() - watch);
r.shutdown();
}
}

@ -1,10 +1,12 @@
package org.redisson;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.reactivestreams.Publisher;
import org.redisson.api.RCollectionReactive;
@ -17,16 +19,18 @@ import reactor.rx.Streams;
public abstract class BaseReactiveTest {
protected static RedissonReactiveClient redisson;
protected RedissonReactiveClient redisson;
@BeforeClass
public static void beforeClass() {
@Before
public void before() throws IOException, InterruptedException {
RedisRunner.startDefaultRedisTestInstance();
redisson = createInstance();
}
@AfterClass
public static void afterClass() {
@After
public void after() throws InterruptedException {
redisson.shutdown();
RedisRunner.shutDownDefaultRedisTestInstance();
}
public <V> Iterable<V> sync(RScoredSortedSetReactive<V> list) {
@ -75,9 +79,10 @@ public abstract class BaseReactiveTest {
return Redisson.createReactive(config);
}
@After
public void after() {
sync(redisson.getKeys().flushdb());
}
// @After
// public void after() throws InterruptedException, IOException {
// afterClass();
// beforeClass();
// }
}

@ -1,35 +1,23 @@
package org.redisson;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.junit.AfterClass;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
public abstract class BaseTest {
protected static RedissonClient redisson;
protected static RedisRunner.RedisProcess redis;
protected RedissonClient redisson;
@BeforeClass
public static void beforeClass() throws IOException, InterruptedException {
System.out.println("Starting up...");
redis = defaultRedisTestInstance();
@Before
public void before() throws IOException, InterruptedException {
RedisRunner.startDefaultRedisTestInstance();
redisson = createInstance();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
afterClass();
} catch (InterruptedException ex) {
}
}));
}
@AfterClass
public static void afterClass() throws InterruptedException {
System.out.println("Shutting down...");
@After
public void after() throws InterruptedException {
redisson.shutdown();
redis.stop();
RedisRunner.shutDownDefaultRedisTestInstance();
}
public static Config createConfig() {
@ -55,13 +43,9 @@ public abstract class BaseTest {
return Redisson.create(config);
}
@Before
public void before() {
redisson.getKeys().flushall();
}
private static RedisRunner.RedisProcess defaultRedisTestInstance() throws IOException, InterruptedException {
return new RedisRunner().run();
}
// @After
// public void after() throws InterruptedException, IOException {
// afterClass();
// beforeClass();
// }
}

@ -28,43 +28,26 @@ import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import java.io.IOException;
import org.junit.AfterClass;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import static org.redisson.BaseTest.afterClass;
public class RedisClientTest {
protected static RedisRunner.RedisProcess redis;
@BeforeClass
public static void beforeClass() throws IOException, InterruptedException {
System.out.println("Starting up...");
redis = defaultRedisTestInstance();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
afterClass();
} catch (InterruptedException ex) {
}
}));
}
@AfterClass
public static void afterClass() throws InterruptedException {
System.out.println("Shutting down...");
redis.stop();
@Before
public static void before() throws IOException, InterruptedException {
RedisRunner.startDefaultRedisTestInstance();
}
private static RedisRunner.RedisProcess defaultRedisTestInstance() throws IOException, InterruptedException {
return new RedisRunner().run();
@After
public static void after() throws InterruptedException {
RedisRunner.shutDownDefaultRedisTestInstance();
}
@Before
public void before() {
System.out.println("Cleaning up...");
RedisClient c = new RedisClient("localhost", 6379);
c.connect().sync(RedisCommands.FLUSHDB);
}
// @After
// public void after() throws InterruptedException, IOException {
// afterClass();
// beforeClass();
// }
@Test
public void testConnectAsync() throws InterruptedException {

@ -6,12 +6,21 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Inet4Address;
import java.net.URL;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.convertor.VoidReplayConvertor;
public class RedisRunner {
@ -161,9 +170,16 @@ public class RedisRunner {
}
private static final String redisBinary;
private final LinkedHashMap<REDIS_OPTIONS, String> options = new LinkedHashMap<>();
private static RedisRunner.RedisProcess defaultRedisInstance;
private static int defaultRedisInstanceExitCode;
private String defaultDir = Paths.get("").toString();
private boolean nosave = false;
private boolean randomDir = false;
private ArrayList<String> bindAddr = new ArrayList<>();
private int port = 6379;
static {
redisBinary = Optional.ofNullable(System.getProperty("redisBinary"))
.orElse("C:\\Devel\\projects\\redis\\Redis-x64-3.0.500\\redis-server.exe");
@ -190,17 +206,17 @@ public class RedisRunner {
*/
public static RedisProcess runRedisWithConfigFile(String configPath) throws IOException, InterruptedException {
URL resource = RedisRunner.class.getResource(configPath);
return runWithOptions(redisBinary, resource.getFile());
return runWithOptions(new RedisRunner(), redisBinary, resource.getFile());
}
private static RedisProcess runWithOptions(String... options) throws IOException, InterruptedException {
private static RedisProcess runWithOptions(RedisRunner runner, String... options) throws IOException, InterruptedException {
List<String> launchOptions = Arrays.stream(options)
.map(x -> Arrays.asList(x.split(" "))).flatMap(x -> x.stream())
.collect(Collectors.toList());
.map(x -> Arrays.asList(x.split(" "))).flatMap(x -> x.stream())
.collect(Collectors.toList());
System.out.println("REDIS LAUNCH OPTIONS: " + Arrays.toString(launchOptions.toArray()));
ProcessBuilder master = new ProcessBuilder(launchOptions)
.redirectErrorStream(true)
.directory(new File(redisBinary).getParentFile());
.directory(new File(System.getProperty("java.io.tmpdir")));
Process p = master.start();
new Thread(() -> {
BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()));
@ -214,11 +230,14 @@ public class RedisRunner {
}
}).start();
Thread.sleep(3000);
return new RedisProcess(p);
return new RedisProcess(p, runner);
}
public RedisProcess run() throws IOException, InterruptedException {
return runWithOptions(options.values().toArray(new String[0]));
if (!options.containsKey(REDIS_OPTIONS.DIR)) {
options.put(REDIS_OPTIONS.DIR, defaultDir);
}
return runWithOptions(this, options.values().toArray(new String[0]));
}
private void addConfigOption(REDIS_OPTIONS option, Object... args) {
@ -251,9 +270,14 @@ public class RedisRunner {
}
public RedisRunner port(int port) {
this.port = port;
addConfigOption(REDIS_OPTIONS.PORT, port);
return this;
}
public int getPort() {
return this.port;
}
public RedisRunner tcpBacklog(long tcpBacklog) {
addConfigOption(REDIS_OPTIONS.TCP_BACKLOG, tcpBacklog);
@ -261,10 +285,15 @@ public class RedisRunner {
}
public RedisRunner bind(String bind) {
this.bindAddr.add(bind);
addConfigOption(REDIS_OPTIONS.BIND, bind);
return this;
}
public ArrayList<String> getBindAddr() {
return this.bindAddr;
}
public RedisRunner unixsocket(String unixsocket) {
addConfigOption(REDIS_OPTIONS.UNIXSOCKET, unixsocket);
return this;
@ -316,7 +345,21 @@ public class RedisRunner {
}
public RedisRunner save(long seconds, long changes) {
addConfigOption(REDIS_OPTIONS.SAVE, seconds, changes);
if (!nosave) {
addConfigOption(REDIS_OPTIONS.SAVE, seconds, changes);
}
return this;
}
/**
* Phantom option
*
* @return RedisRunner
*/
public RedisRunner nosave() {
this.nosave = true;
options.remove(REDIS_OPTIONS.SAVE);
addConfigOption(REDIS_OPTIONS.SAVE, "");
return this;
}
@ -341,7 +384,21 @@ public class RedisRunner {
}
public RedisRunner dir(String dir) {
addConfigOption(REDIS_OPTIONS.DIR, dir);
if (!randomDir) {
addConfigOption(REDIS_OPTIONS.DIR, dir);
}
return this;
}
/**
* Phantom option
* @return RedisRunner
*/
public RedisRunner randomDir() {
this.randomDir = true;
options.remove(REDIS_OPTIONS.DIR);
makeRandomDefaultDir();
addConfigOption(REDIS_OPTIONS.DIR, defaultDir);
return this;
}
@ -609,17 +666,62 @@ public class RedisRunner {
return this;
}
public boolean isRandomDir() {
return this.randomDir;
}
public boolean isNosave() {
return this.nosave;
}
public String defaultDir() {
return this.defaultDir;
}
public boolean deleteDBfileDir() {
File f = new File(defaultDir);
if (f.exists()) {
System.out.println("RedisRunner: Deleting directory " + defaultDir);
return f.delete();
}
return false;
}
private void makeRandomDefaultDir() {
File f = new File(System.getProperty("java.io.tmpdir") + "/" + UUID.randomUUID());
if (f.exists()) {
makeRandomDefaultDir();
} else {
System.out.println("RedisRunner: Making directory " + f.getAbsolutePath());
f.mkdirs();
this.defaultDir = f.getAbsolutePath();
}
}
public static final class RedisProcess {
private final Process redisProcess;
private final RedisRunner runner;
private RedisProcess(Process redisProcess) {
private RedisProcess(Process redisProcess, RedisRunner runner) {
this.redisProcess = redisProcess;
this.runner = runner;
}
public int stop() throws InterruptedException {
if (runner.isNosave()) {
ArrayList<String> b = runner.getBindAddr();
RedisClient c = new RedisClient(b.size() > 0 ? b.get(0) : "localhost", runner.getPort());
c.connect()
.async(new RedisStrictCommand<Void>("SHUTDOWN", "NOSAVE", new VoidReplayConvertor()))
.await(3, TimeUnit.SECONDS);
c.shutdown();
}
redisProcess.destroy();
int exitCode = redisProcess.waitFor();
int exitCode = redisProcess.isAlive() ? redisProcess.waitFor() : redisProcess.exitValue();
if (runner.isRandomDir()) {
runner.deleteDBfileDir();
}
return exitCode == 1 && isWindows() ? 0 : exitCode;
}
@ -632,4 +734,32 @@ public class RedisRunner {
}
}
public static RedisRunner.RedisProcess startDefaultRedisTestInstance() throws IOException, InterruptedException {
if (defaultRedisInstance == null) {
System.out.println("REDIS PROCESS: Starting up default instance...");
defaultRedisInstance = new RedisRunner().nosave().randomDir().run();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
shutDownDefaultRedisTestInstance();
} catch (InterruptedException ex) {
}
}));
}
return defaultRedisInstance;
}
public static int shutDownDefaultRedisTestInstance() throws InterruptedException {
if (defaultRedisInstance != null) {
System.out.println("REDIS PROCESS: Shutting down default instance...");
try {
defaultRedisInstanceExitCode = defaultRedisInstance.stop();
} finally {
defaultRedisInstance = null;
}
} else {
System.out.println("REDIS PROCESS: Default instance is already down with an exit code " + defaultRedisInstanceExitCode);
}
return defaultRedisInstanceExitCode;
}
}

@ -29,7 +29,11 @@ public class RedissonBlockingQueueTest extends BaseTest {
@Test
public void testPollWithBrokenConnection() throws IOException, InterruptedException, ExecutionException {
RedisProcess runner = new RedisRunner().port(6319).run();
RedisProcess runner = new RedisRunner()
.port(6319)
.nosave()
.randomDir()
.run();
Config config = new Config();
config.useSingleServer().setAddress("127.0.0.1:6319");
@ -45,7 +49,11 @@ public class RedissonBlockingQueueTest extends BaseTest {
@Test
public void testPollReattach() throws InterruptedException, IOException, ExecutionException, TimeoutException {
RedisProcess runner = new RedisRunner().port(6319).run();
RedisProcess runner = new RedisRunner()
.port(6319)
.nosave()
.randomDir()
.run();
Config config = new Config();
config.useSingleServer().setAddress("127.0.0.1:6319");
@ -56,7 +64,11 @@ public class RedissonBlockingQueueTest extends BaseTest {
f.await(1, TimeUnit.SECONDS);
runner.stop();
runner = new RedisRunner().port(6319).run();
runner = new RedisRunner()
.port(6319)
.nosave()
.randomDir()
.run();
queue1.put(123);
// check connection rotation
@ -73,7 +85,11 @@ public class RedissonBlockingQueueTest extends BaseTest {
@Test
public void testTakeReattach() throws InterruptedException, IOException, ExecutionException, TimeoutException {
RedisProcess runner = new RedisRunner().port(6319).run();
RedisProcess runner = new RedisRunner()
.port(6319)
.nosave()
.randomDir()
.run();
Config config = new Config();
config.useSingleServer().setAddress("127.0.0.1:6319");
@ -83,7 +99,11 @@ public class RedissonBlockingQueueTest extends BaseTest {
f.await(1, TimeUnit.SECONDS);
runner.stop();
runner = new RedisRunner().port(6319).run();
runner = new RedisRunner()
.port(6319)
.nosave()
.randomDir()
.run();
queue1.put(123);
// check connection rotation

@ -75,18 +75,24 @@ public class RedissonMultiLockTest {
private RedisProcess redisTestMultilockInstance1() throws IOException, InterruptedException {
return new RedisRunner()
.nosave()
.randomDir()
.port(6320)
.run();
}
private RedisProcess redisTestMultilockInstance2() throws IOException, InterruptedException {
return new RedisRunner()
.nosave()
.randomDir()
.port(6321)
.run();
}
private RedisProcess redisTestMultilockInstance3() throws IOException, InterruptedException {
return new RedisRunner()
.nosave()
.randomDir()
.port(6322)
.run();
}

@ -276,12 +276,16 @@ public class RedissonTest {
private RedisProcess redisTestSmallMemory() throws IOException, InterruptedException {
return new RedisRunner()
.maxmemory("1mb")
.nosave()
.randomDir()
.port(6319)
.run();
}
private RedisProcess redisTestConnection() throws IOException, InterruptedException {
return new RedisRunner()
.nosave()
.randomDir()
.port(6319)
.run();
}

Loading…
Cancel
Save