Merge pull request #509 from jackygurui/feature/travis-ci

Feature/travis ci
pull/510/head
Nikita Koksharov 9 years ago
commit 8fbeca9d5f

2
.gitignore vendored

@ -10,3 +10,5 @@
/*.cmd
nb-configuration.xml
nbactions-unit-test.xml

@ -20,9 +20,189 @@ jdk:
- oraclejdk8
env:
matrix:
- REDIS_VERSION=3.0.7
- REDIS_VERSION=2.8.24
- REDIS_VERSION=3.2.0-rc3
# Current
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.ConcurrentRedissonSortedSetTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedisClientTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonAtomicDoubleTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonAtomicLongReactiveTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonAtomicLongTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonBatchTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonBitSetReactiveTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonBitSetTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonBlockingDequeTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonBlockingQueueReactiveTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonBlockingQueueTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonBloomFilterTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonBucketReactiveTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonBucketTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonBucketsTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonCodecTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonConcurrentMapTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonCountDownLatchConcurrentTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonCountDownLatchTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonDequeReactiveTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonDequeTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonGeoTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonHyperLogLogReactiveTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonHyperLogLogTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonKeysReactiveTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonKeysTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonLexSortedSetReactiveTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonLexSortedSetTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonListMultimapCacheTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonListMultimapTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonListReactiveTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonListTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonLockTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonMapCacheReactiveTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonMapCacheTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonMapReactiveTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonMapTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonMultiLockTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonQueueReactiveTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonQueueTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonReadWriteLockTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonRemoteServiceTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonScoredSortedSetReactiveTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonScoredSortedSetTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonScriptReactiveTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonScriptTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonSemaphoreTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonSetCacheReactiveTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonSetCacheTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonSetMultimapCacheTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonSetMultimapTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonSetReactiveTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonSetTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonSortedSetTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonTopicPatternTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonTopicTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.RedissonTwoLockedThread
# - REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.TimeoutTest
- REDIS_VERSION=3.2.0 REDISSON_TEST=org.redisson.spring.cache.RedissonSpringCacheTest
# Legacy
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.ConcurrentRedissonSortedSetTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedisClientTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonAtomicDoubleTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonAtomicLongReactiveTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonAtomicLongTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonBatchTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonBitSetReactiveTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonBitSetTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonBlockingDequeTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonBlockingQueueReactiveTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonBlockingQueueTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonBloomFilterTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonBucketReactiveTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonBucketTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonBucketsTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonCodecTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonConcurrentMapTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonCountDownLatchConcurrentTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonCountDownLatchTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonDequeReactiveTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonDequeTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonGeoTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonHyperLogLogReactiveTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonHyperLogLogTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonKeysReactiveTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonKeysTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonLexSortedSetReactiveTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonLexSortedSetTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonListMultimapCacheTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonListMultimapTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonListReactiveTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonListTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonLockTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonMapCacheReactiveTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonMapCacheTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonMapReactiveTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonMapTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonMultiLockTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonQueueReactiveTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonQueueTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonReadWriteLockTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonRemoteServiceTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonScoredSortedSetReactiveTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonScoredSortedSetTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonScriptReactiveTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonScriptTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonSemaphoreTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonSetCacheReactiveTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonSetCacheTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonSetMultimapCacheTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonSetMultimapTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonSetReactiveTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonSetTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonSortedSetTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonTopicPatternTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonTopicTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.RedissonTwoLockedThread
# - REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.TimeoutTest
- REDIS_VERSION=3.0.7 REDISSON_TEST=org.redisson.spring.cache.RedissonSpringCacheTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.ConcurrentRedissonSortedSetTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedisClientTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonAtomicDoubleTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonAtomicLongReactiveTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonAtomicLongTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonBatchTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonBitSetReactiveTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonBitSetTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonBlockingDequeTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonBlockingQueueReactiveTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonBlockingQueueTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonBloomFilterTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonBucketReactiveTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonBucketTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonBucketsTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonCodecTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonConcurrentMapTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonCountDownLatchConcurrentTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonCountDownLatchTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonDequeReactiveTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonDequeTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonGeoTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonHyperLogLogReactiveTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonHyperLogLogTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonKeysReactiveTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonKeysTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonLexSortedSetReactiveTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonLexSortedSetTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonListMultimapCacheTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonListMultimapTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonListReactiveTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonListTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonLockTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonMapCacheReactiveTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonMapCacheTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonMapReactiveTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonMapTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonMultiLockTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonQueueReactiveTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonQueueTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonReadWriteLockTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonRemoteServiceTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonScoredSortedSetReactiveTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonScoredSortedSetTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonScriptReactiveTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonScriptTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonSemaphoreTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonSetCacheReactiveTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonSetCacheTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonSetMultimapCacheTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonSetMultimapTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonSetReactiveTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonSetTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonSortedSetTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonTopicPatternTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonTopicTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.RedissonTwoLockedThread
# - REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.TimeoutTest
- REDIS_VERSION=2.8.24 REDISSON_TEST=org.redisson.spring.cache.RedissonSpringCacheTest
# Unstable
cache:
directories:
- $HOME/.m2
@ -37,4 +217,5 @@ before_script:
- $REDIS_BIN/redis-cli PING
- export REDIS_VERSION="$(redis-cli INFO SERVER | sed -n 2p)"
- echo $REDIS_VERSION
script: mvn -DargLine="-DredisBinary=$REDIS_BIN/redis-server -DtravisEnv=true" -Punit-test -Ptravis clean verify
- redis-cli SHUTDOWN NOSAVE
script: mvn -Dtest=$REDISSON_TEST -Dsurefire.rerunFailingTestsCount=5 -DargLine="-Xmx2g -DredisBinary=$REDIS_BIN/redis-server -DtravisEnv=true" -Punit-test clean test -e -X

@ -352,6 +352,14 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19.1</version>
<configuration>
<properties>
<property>
<name>listener</name>
<value>org.redisson.RedissonTestRunListener</value>
</property>
</properties>
</configuration>
</plugin>
<plugin>

@ -37,9 +37,12 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
import java.util.Map;
import org.redisson.client.protocol.RedisCommands;
public class RedisClient {
@ -150,6 +153,37 @@ public class RedisClient {
return channels.close();
}
/**
* Execute INFO SERVER operation.
*
* @return Map extracted from each response line splitting by ':' symbol
*/
public Map<String, String> serverInfo() {
try {
return serverInfoAsync().sync().get();
} catch (Exception e) {
throw new RedisConnectionException("Unable to retrieve server into from: " + addr, e);
}
}
/**
* Asynchronously execute INFO SERVER operation.
*
* @return A future for a map extracted from each response line splitting by
* ':' symbol
*/
public Future<Map<String, String>> serverInfoAsync() {
final RedisConnection connection = connect();
Promise<Map<String, String>> async = (Promise) connection.async(RedisCommands.SERVER_INFO);
async.addListener(new GenericFutureListener<Promise<Map<String, String>>>() {
@Override
public void operationComplete(Promise<Map<String, String>> future) throws Exception {
connection.closeAsync();
}
});
return async;
}
@Override
public String toString() {
return "[addr=" + addr + "]";

@ -260,4 +260,5 @@ public interface RedisCommands {
RedisStrictCommand<String> INFO_REPLICATION = new RedisStrictCommand<String>("INFO", "replication", new StringDataDecoder());
RedisStrictCommand<Map<String, String>> INFO_PERSISTENCE = new RedisStrictCommand<Map<String, String>>("INFO", "persistence", new StringMapDataDecoder());
RedisStrictCommand<Map<String, String>> SERVER_INFO = new RedisStrictCommand<Map<String, String>>("INFO", "SERVER", new StringMapDataDecoder());
}

@ -38,6 +38,8 @@ public interface RScriptAsync {
<R> Future<R> evalAsync(Mode mode, String luaScript, ReturnType returnType, List<Object> keys, Object... values);
<R> Future<R> evalAsync(Mode mode, Codec codec, String luaScript, ReturnType returnType, List<Object> keys, Object... values);
<R> Future<R> evalAsync(String key, Mode mode, Codec codec, String luaScript, ReturnType returnType, List<Object> keys, Object... values);
<R> Future<R> evalAsync(String key, Mode mode, Codec codec, String luaScript, ReturnType returnType, List<Object> keys, Object... values);

@ -4,14 +4,55 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.junit.Assert;
import org.redisson.client.RedisClient;
public abstract class BaseConcurrentTest extends BaseTest {
protected void testMultiInstanceConcurrency(int iterations, final RedissonRunnable runnable) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*2);
System.out.println("Multi Instance Concurrent Job Interation: " + iterations);
ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2);
final Map<Integer, RedissonClient> instances = new HashMap<>();
pool.submit(() -> {
IntStream.range(0, iterations)
.parallel()
.forEach((i) -> instances.put(i, BaseTest.createInstance()));
});
long watch = System.currentTimeMillis();
pool.awaitQuiescence(5, TimeUnit.MINUTES);
pool.submit(() -> {
IntStream.range(0, iterations)
.parallel()
.forEach((i) -> runnable.run(instances.get(i)));
});
pool.shutdown();
Assert.assertTrue(pool.awaitTermination(RedissonRuntimeEnvironment.isTravis ? 10 : 3, TimeUnit.MINUTES));
System.out.println("multi: " + (System.currentTimeMillis() - watch));
pool = new ForkJoinPool();
pool.submit(() -> {
instances.values()
.parallelStream()
.<RedisClient>forEach((r) -> r.shutdown());
});
pool.shutdown();
Assert.assertTrue(pool.awaitTermination(5, TimeUnit.MINUTES));
}
protected void testMultiInstanceConcurrencySequentiallyLaunched(int iterations, final RedissonRunnable runnable) throws InterruptedException {
System.out.println("Multi Instance Concurrent Job Interation: " + iterations);
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
final Map<Integer, RedissonClient> instances = new HashMap<Integer, RedissonClient>();
for (int i = 0; i < iterations; i++) {
@ -21,13 +62,7 @@ public abstract class BaseConcurrentTest extends BaseTest {
long watch = System.currentTimeMillis();
for (int i = 0; i < iterations; i++) {
final int n = i;
executor.execute(new Runnable() {
@Override
public void run() {
RedissonClient redisson = instances.get(n);
runnable.run(redisson);
}
});
executor.execute(() -> runnable.run(instances.get(n)));
}
executor.shutdown();
@ -38,12 +73,7 @@ public abstract class BaseConcurrentTest extends BaseTest {
executor = Executors.newCachedThreadPool();
for (final RedissonClient redisson : instances.values()) {
executor.execute(new Runnable() {
@Override
public void run() {
redisson.shutdown();
}
});
executor.execute(() -> redisson.shutdown());
}
executor.shutdown();
@ -51,27 +81,26 @@ public abstract class BaseConcurrentTest extends BaseTest {
}
protected void testSingleInstanceConcurrency(int iterations, final RedissonRunnable runnable) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*2);
final RedissonClient redisson = BaseTest.createInstance();
System.out.println("Single Instance Concurrent Job Interation: " + iterations);
final RedissonClient r = BaseTest.createInstance();
long watch = System.currentTimeMillis();
for (int i = 0; i < iterations; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
runnable.run(redisson);
}
});
}
executor.shutdown();
Assert.assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES));
ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2);
System.out.println(System.currentTimeMillis() - watch);
pool.submit(() -> {
IntStream.range(0, iterations)
.parallel()
.forEach((i) -> {
runnable.run(r);
});
});
redisson.shutdown();
}
pool.shutdown();
Assert.assertTrue(pool.awaitTermination(RedissonRuntimeEnvironment.isTravis ? 10 : 3, TimeUnit.MINUTES));
System.out.println(System.currentTimeMillis() - watch);
r.shutdown();
}
}

@ -1,10 +1,11 @@
package org.redisson;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.reactivestreams.Publisher;
import org.redisson.api.RCollectionReactive;
@ -12,21 +13,48 @@ import org.redisson.api.RScoredSortedSetReactive;
import org.redisson.api.RedissonReactiveClient;
import reactor.rx.Promise;
import reactor.rx.Stream;
import reactor.rx.Streams;
public abstract class BaseReactiveTest {
protected static RedissonReactiveClient redisson;
protected RedissonReactiveClient redisson;
protected static RedissonReactiveClient defaultRedisson;
@BeforeClass
public static void beforeClass() {
redisson = createInstance();
public static void beforeClass() throws IOException, InterruptedException {
if (!RedissonRuntimeEnvironment.isTravis) {
RedisRunner.startDefaultRedisServerInstance();
defaultRedisson = createInstance();
}
}
@AfterClass
public static void afterClass() {
redisson.shutdown();
public static void afterClass() throws IOException, InterruptedException {
if (!RedissonRuntimeEnvironment.isTravis) {
RedisRunner.shutDownDefaultRedisServerInstance();
defaultRedisson.shutdown();
}
}
@Before
public void before() throws IOException, InterruptedException {
if (RedissonRuntimeEnvironment.isTravis) {
RedisRunner.startDefaultRedisServerInstance();
redisson = createInstance();
} else {
if (redisson == null) {
redisson = defaultRedisson;
}
redisson.getKeys().flushall();
}
}
@After
public void after() throws InterruptedException {
if (RedissonRuntimeEnvironment.isTravis) {
redisson.shutdown();
RedisRunner.shutDownDefaultRedisServerInstance();
}
}
public <V> Iterable<V> sync(RScoredSortedSetReactive<V> list) {
@ -75,9 +103,4 @@ public abstract class BaseReactiveTest {
return Redisson.createReactive(config);
}
@After
public void after() {
sync(redisson.getKeys().flushdb());
}
}

@ -1,21 +1,51 @@
package org.redisson;
import java.io.IOException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
public abstract class BaseTest {
protected static RedissonClient redisson;
protected RedissonClient redisson;
protected static RedissonClient defaultRedisson;
@BeforeClass
public static void beforeClass() {
redisson = createInstance();
public static void beforeClass() throws IOException, InterruptedException {
if (!RedissonRuntimeEnvironment.isTravis) {
RedisRunner.startDefaultRedisServerInstance();
defaultRedisson = createInstance();
}
}
@AfterClass
public static void afterClass() {
redisson.shutdown();
public static void afterClass() throws IOException, InterruptedException {
if (!RedissonRuntimeEnvironment.isTravis) {
RedisRunner.shutDownDefaultRedisServerInstance();
defaultRedisson.shutdown();
}
}
@Before
public void before() throws IOException, InterruptedException {
if (RedissonRuntimeEnvironment.isTravis) {
RedisRunner.startDefaultRedisServerInstance();
redisson = createInstance();
} else {
if (redisson == null) {
redisson = defaultRedisson;
}
redisson.getKeys().flushall();
}
}
@After
public void after() throws InterruptedException {
if (RedissonRuntimeEnvironment.isTravis) {
redisson.shutdown();
RedisRunner.shutDownDefaultRedisServerInstance();
}
}
public static Config createConfig() {
@ -41,9 +71,4 @@ public abstract class BaseTest {
return Redisson.create(config);
}
@Before
public void before() {
redisson.getKeys().flushall();
}
}

@ -23,25 +23,21 @@ public class ConcurrentRedissonSortedSetTest extends BaseConcurrentTest {
int length = 5000;
final List<Integer> elements = new ArrayList<Integer>();
for (int i = 1; i < length+1; i++) {
for (int i = 1; i < length + 1; i++) {
elements.add(i);
}
Collections.shuffle(elements);
final AtomicInteger counter = new AtomicInteger(-1);
testSingleInstanceConcurrency(length, new RedissonRunnable() {
@Override
public void run(RedissonClient redisson) {
RSortedSet<Integer> set = redisson.getSortedSet(name);
int c = counter.incrementAndGet();
Integer element = elements.get(c);
Assert.assertTrue(set.add(element));
}
testSingleInstanceConcurrency(length, rc -> {
RSortedSet<Integer> set = rc.getSortedSet(name);
int c = counter.incrementAndGet();
Integer element = elements.get(c);
Assert.assertTrue(set.add(element));
});
// for (Integer integer : map) {
// System.out.println("int: " + integer);
// }
Collections.sort(elements);
Integer[] p = elements.toArray(new Integer[elements.size()]);
MatcherAssert.assertThat(map, Matchers.contains(p));
@ -60,13 +56,10 @@ public class ConcurrentRedissonSortedSetTest extends BaseConcurrentTest {
int length = 1000;
final AtomicInteger counter = new AtomicInteger();
testSingleInstanceConcurrency(length, new RedissonRunnable() {
@Override
public void run(RedissonClient redisson) {
RSortedSet<Integer> set = redisson.getSortedSet(name);
int c = counter.decrementAndGet();
Assert.assertTrue(set.add(c));
}
testSingleInstanceConcurrency(length, rc -> {
RSortedSet<Integer> set = rc.getSortedSet(name);
int c = counter.decrementAndGet();
Assert.assertTrue(set.add(c));
});
List<Integer> elements = new ArrayList<Integer>();

@ -27,23 +27,53 @@ import org.redisson.client.protocol.pubsub.PubSubType;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import java.io.IOException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
public class RedisClientTest {
@BeforeClass
public static void beforeClass() throws IOException, InterruptedException {
if (!RedissonRuntimeEnvironment.isTravis) {
RedisRunner.startDefaultRedisServerInstance();
}
}
@AfterClass
public static void afterClass() throws IOException, InterruptedException {
if (!RedissonRuntimeEnvironment.isTravis) {
RedisRunner.shutDownDefaultRedisServerInstance();
}
}
@Before
public void before() throws IOException, InterruptedException {
if (RedissonRuntimeEnvironment.isTravis) {
RedisRunner.startDefaultRedisServerInstance();
}
}
@After
public void after() throws InterruptedException {
if (RedissonRuntimeEnvironment.isTravis) {
RedisRunner.shutDownDefaultRedisServerInstance();
}
}
@Test
public void testConnectAsync() throws InterruptedException {
RedisClient c = new RedisClient("localhost", 6379);
Future<RedisConnection> f = c.connectAsync();
final CountDownLatch l = new CountDownLatch(1);
f.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> future) throws Exception {
RedisConnection conn = future.get();
assertThat(conn.sync(RedisCommands.PING)).isEqualTo("PONG");
l.countDown();
}
f.addListener((FutureListener<RedisConnection>) future -> {
RedisConnection conn = future.get();
assertThat(conn.sync(RedisCommands.PING)).isEqualTo("PONG");
l.countDown();
});
l.await();
l.await(10, TimeUnit.SECONDS);
}
@Test
@ -71,7 +101,7 @@ public class RedisClientTest {
});
pubSubConnection.subscribe(StringCodec.INSTANCE, "test1", "test2");
latch.await();
latch.await(10, TimeUnit.SECONDS);
}
@Test
@ -80,13 +110,10 @@ public class RedisClientTest {
final RedisConnection conn = c.connect();
conn.sync(StringCodec.INSTANCE, RedisCommands.SET, "test", 0);
ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*2);
ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
for (int i = 0; i < 100000; i++) {
pool.execute(new Runnable() {
@Override
public void run() {
conn.async(StringCodec.INSTANCE, RedisCommands.INCR, "test");
}
pool.execute(() -> {
conn.async(StringCodec.INSTANCE, RedisCommands.INCR, "test");
});
}
@ -94,7 +121,7 @@ public class RedisClientTest {
assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
assertThat((Long)conn.sync(LongCodec.INSTANCE, RedisCommands.GET, "test")).isEqualTo(100000);
assertThat((Long) conn.sync(LongCodec.INSTANCE, RedisCommands.GET, "test")).isEqualTo(100000);
conn.sync(RedisCommands.FLUSHDB);
}

@ -6,12 +6,18 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Inet4Address;
import java.net.URL;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.convertor.VoidReplayConvertor;
public class RedisRunner {
@ -160,17 +166,18 @@ public class RedisRunner {
A
}
private static final String redisBinary;
private final LinkedHashMap<REDIS_OPTIONS, String> options = new LinkedHashMap<>();
private static RedisRunner.RedisProcess defaultRedisInstance;
private static int defaultRedisInstanceExitCode;
static {
redisBinary = Optional.ofNullable(System.getProperty("redisBinary"))
.orElse("C:\\Devel\\projects\\redis\\Redis-x64-3.0.500\\redis-server.exe");
}
private String defaultDir = Paths.get("").toString();
private boolean nosave = false;
private boolean randomDir = false;
private ArrayList<String> bindAddr = new ArrayList<>();
private int port = 6379;
{
this.options.put(REDIS_OPTIONS.BINARY_PATH, redisBinary);
this.options.put(REDIS_OPTIONS.BINARY_PATH, RedissonRuntimeEnvironment.redisBinaryPath);
}
/**
@ -190,35 +197,38 @@ public class RedisRunner {
*/
public static RedisProcess runRedisWithConfigFile(String configPath) throws IOException, InterruptedException {
URL resource = RedisRunner.class.getResource(configPath);
return runWithOptions(redisBinary, resource.getFile());
return runWithOptions(new RedisRunner(), RedissonRuntimeEnvironment.redisBinaryPath, resource.getFile());
}
private static RedisProcess runWithOptions(String... options) throws IOException, InterruptedException {
private static RedisProcess runWithOptions(RedisRunner runner, String... options) throws IOException, InterruptedException {
List<String> launchOptions = Arrays.stream(options)
.map(x -> Arrays.asList(x.split(" "))).flatMap(x -> x.stream())
.collect(Collectors.toList());
.map(x -> Arrays.asList(x.split(" "))).flatMap(x -> x.stream())
.collect(Collectors.toList());
System.out.println("REDIS LAUNCH OPTIONS: " + Arrays.toString(launchOptions.toArray()));
ProcessBuilder master = new ProcessBuilder(launchOptions)
.redirectErrorStream(true)
.directory(new File(redisBinary).getParentFile());
.directory(new File(RedissonRuntimeEnvironment.tempDir));
Process p = master.start();
new Thread(() -> {
BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()));
String line;
try {
while (p.isAlive() && (line = reader.readLine()) != null) {
while (p.isAlive() && (line = reader.readLine()) != null && !RedissonRuntimeEnvironment.isTravis) {
System.out.println("REDIS PROCESS: " + line);
}
} catch (IOException ex) {
System.out.println("Exception: " + ex.getLocalizedMessage());
}
}).start();
Thread.sleep(1000);
return new RedisProcess(p);
Thread.sleep(1500);
return new RedisProcess(p, runner);
}
public RedisProcess run() throws IOException, InterruptedException {
return runWithOptions(options.values().toArray(new String[0]));
if (!options.containsKey(REDIS_OPTIONS.DIR)) {
options.put(REDIS_OPTIONS.DIR, defaultDir);
}
return runWithOptions(this, options.values().toArray(new String[0]));
}
private void addConfigOption(REDIS_OPTIONS option, Object... args) {
@ -251,20 +261,30 @@ public class RedisRunner {
}
public RedisRunner port(int port) {
this.port = port;
addConfigOption(REDIS_OPTIONS.PORT, port);
return this;
}
public int getPort() {
return this.port;
}
public RedisRunner tcpBacklog(long tcpBacklog) {
addConfigOption(REDIS_OPTIONS.TCP_BACKLOG, tcpBacklog);
return this;
}
public RedisRunner bind(String bind) {
this.bindAddr.add(bind);
addConfigOption(REDIS_OPTIONS.BIND, bind);
return this;
}
public ArrayList<String> getBindAddr() {
return this.bindAddr;
}
public RedisRunner unixsocket(String unixsocket) {
addConfigOption(REDIS_OPTIONS.UNIXSOCKET, unixsocket);
return this;
@ -316,7 +336,21 @@ public class RedisRunner {
}
public RedisRunner save(long seconds, long changes) {
addConfigOption(REDIS_OPTIONS.SAVE, seconds, changes);
if (!nosave) {
addConfigOption(REDIS_OPTIONS.SAVE, seconds, changes);
}
return this;
}
/**
* Phantom option
*
* @return RedisRunner
*/
public RedisRunner nosave() {
this.nosave = true;
options.remove(REDIS_OPTIONS.SAVE);
addConfigOption(REDIS_OPTIONS.SAVE, "''");
return this;
}
@ -341,7 +375,22 @@ public class RedisRunner {
}
public RedisRunner dir(String dir) {
addConfigOption(REDIS_OPTIONS.DIR, dir);
if (!randomDir) {
addConfigOption(REDIS_OPTIONS.DIR, dir);
}
return this;
}
/**
* Phantom option
*
* @return RedisRunner
*/
public RedisRunner randomDir() {
this.randomDir = true;
options.remove(REDIS_OPTIONS.DIR);
makeRandomDefaultDir();
addConfigOption(REDIS_OPTIONS.DIR, defaultDir);
return this;
}
@ -609,27 +658,127 @@ public class RedisRunner {
return this;
}
public boolean isRandomDir() {
return this.randomDir;
}
public boolean isNosave() {
return this.nosave;
}
public String defaultDir() {
return this.defaultDir;
}
public String getInitialBindAddr() {
return bindAddr.size() > 0 ? bindAddr.get(0) : "localhost";
}
public boolean deleteDBfileDir() {
File f = new File(defaultDir);
if (f.exists()) {
System.out.println("REDIS RUNNER: Deleting directory " + defaultDir);
return f.delete();
}
return false;
}
private void makeRandomDefaultDir() {
File f = new File(RedissonRuntimeEnvironment.tempDir + "/" + UUID.randomUUID());
if (f.exists()) {
makeRandomDefaultDir();
} else {
System.out.println("REDIS RUNNER: Making directory " + f.getAbsolutePath());
f.mkdirs();
this.defaultDir = f.getAbsolutePath();
}
}
public static final class RedisProcess {
private final Process redisProcess;
private RedisProcess(Process redisProcess) {
private final RedisRunner runner;
private RedisVersion redisVersion;
private RedisProcess(Process redisProcess, RedisRunner runner) {
this.redisProcess = redisProcess;
this.runner = runner;
}
public int stop() throws InterruptedException {
if (runner.isNosave() && !runner.isRandomDir()) {
RedisClient c = createDefaultRedisClientInstance();
RedisConnection connection = c.connect();
connection.async(new RedisStrictCommand<Void>("SHUTDOWN", "NOSAVE", new VoidReplayConvertor()))
.await(3, TimeUnit.SECONDS);
c.shutdown();
connection.closeAsync().syncUninterruptibly();
}
redisProcess.destroy();
int exitCode = redisProcess.waitFor();
return exitCode == 1 && isWindows() ? 0 : exitCode;
int exitCode = redisProcess.isAlive() ? redisProcess.waitFor() : redisProcess.exitValue();
if (runner.isRandomDir()) {
runner.deleteDBfileDir();
}
return exitCode == 1 && RedissonRuntimeEnvironment.isWindows ? 0 : exitCode;
}
public Process getRedisProcess() {
return redisProcess;
}
private boolean isWindows() {
return System.getProperty("os.name", "generic").toLowerCase(Locale.ENGLISH).contains("win");
public RedisClient createRedisClientInstance() {
if (redisProcess.isAlive()) {
return new RedisClient(runner.getInitialBindAddr(), runner.getPort());
}
throw new IllegalStateException("Redis server instance is not running.");
}
public RedisVersion getRedisVersion() {
if (redisVersion == null) {
redisVersion = new RedisVersion(createRedisClientInstance().serverInfo().get("redis_version"));
}
return redisVersion;
}
}
public static RedisRunner.RedisProcess startDefaultRedisServerInstance() throws IOException, InterruptedException {
if (defaultRedisInstance == null) {
System.out.println("REDIS RUNNER: Starting up default instance...");
defaultRedisInstance = new RedisRunner().nosave().randomDir().run();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
shutDownDefaultRedisServerInstance();
} catch (InterruptedException ex) {
}
}));
}
return defaultRedisInstance;
}
public static int shutDownDefaultRedisServerInstance() throws InterruptedException {
if (defaultRedisInstance != null) {
System.out.println("REDIS RUNNER: Shutting down default instance...");
try {
defaultRedisInstanceExitCode = defaultRedisInstance.stop();
} finally {
defaultRedisInstance = null;
}
} else {
System.out.println("REDIS RUNNER: Default instance is already down with an exit code " + defaultRedisInstanceExitCode);
}
return defaultRedisInstanceExitCode;
}
public static boolean isDefaultRedisServerInstanceRunning() {
return defaultRedisInstance != null && defaultRedisInstance.redisProcess.isAlive();
}
public static RedisClient createDefaultRedisClientInstance() {
return defaultRedisInstance.createRedisClientInstance();
}
public static RedisRunner.RedisProcess getDefaultRedisServerInstance() {
return defaultRedisInstance;
}
}

@ -0,0 +1,58 @@
package org.redisson;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
*
* @author Jack
*/
public class RedisVersion implements Comparable<RedisVersion>{
private final String fullVersion;
private final Integer majorVersion;
private final Integer minorVersion;
private final Integer patchVersion;
public RedisVersion(String fullVersion) {
this.fullVersion = fullVersion;
Matcher matcher = Pattern.compile("^([\\d]+)\\.([\\d]+)\\.([\\d]+)$").matcher(fullVersion);
matcher.find();
majorVersion = Integer.parseInt(matcher.group(1));
minorVersion = Integer.parseInt(matcher.group(2));
patchVersion = Integer.parseInt(matcher.group(3));
}
public String getFullVersion() {
return fullVersion;
}
public int getMajorVersion() {
return majorVersion;
}
public int getMinorVersion() {
return minorVersion;
}
public int getPatchVersion() {
return patchVersion;
}
@Override
public int compareTo(RedisVersion o) {
int ma = this.majorVersion.compareTo(o.majorVersion);
int mi = this.minorVersion.compareTo(o.minorVersion);
int pa = this.patchVersion.compareTo(o.patchVersion);
return ma != 0 ? ma : mi != 0 ? mi : pa;
}
public int compareTo(String redisVersion) {
return this.compareTo(new RedisVersion(redisVersion));
}
public static int compareTo(String redisVersion1, String redisVersion2) {
return new RedisVersion(redisVersion1).compareTo(redisVersion2);
}
}

@ -94,19 +94,16 @@ public class RedissonBlockingDequeTest extends BaseTest {
@Test
public void testTakeFirstAwait() throws InterruptedException {
RBlockingDeque<Integer> deque = redisson.getBlockingDeque("queue:take");
Executors.newSingleThreadScheduledExecutor().schedule(new Runnable() {
@Override
public void run() {
RBlockingDeque<Integer> deque = redisson.getBlockingDeque("queue:take");
try {
deque.putFirst(1);
deque.putFirst(2);
deque.putLast(3);
deque.putLast(4);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Executors.newSingleThreadScheduledExecutor().schedule(() -> {
RBlockingDeque<Integer> deque1 = redisson.getBlockingDeque("queue:take");
try {
deque1.putFirst(1);
deque1.putFirst(2);
deque1.putLast(3);
deque1.putLast(4);
}catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}, 10, TimeUnit.SECONDS);
@ -122,19 +119,16 @@ public class RedissonBlockingDequeTest extends BaseTest {
@Test
public void testTakeLastAwait() throws InterruptedException {
RBlockingDeque<Integer> deque = redisson.getBlockingDeque("queue:take");
Executors.newSingleThreadScheduledExecutor().schedule(new Runnable() {
@Override
public void run() {
RBlockingDeque<Integer> deque = redisson.getBlockingDeque("queue:take");
try {
deque.putFirst(1);
deque.putFirst(2);
deque.putLast(3);
deque.putLast(4);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Executors.newSingleThreadScheduledExecutor().schedule(() -> {
RBlockingDeque<Integer> deque1 = redisson.getBlockingDeque("queue:take");
try {
deque1.putFirst(1);
deque1.putFirst(2);
deque1.putLast(3);
deque1.putLast(4);
}catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}, 10, TimeUnit.SECONDS);

@ -23,15 +23,12 @@ public class RedissonBlockingQueueReactiveTest extends BaseReactiveTest {
@Test
public void testPollFromAny() throws InterruptedException {
final RBlockingQueueReactive<Integer> queue1 = redisson.getBlockingQueue("queue:pollany");
Executors.newSingleThreadScheduledExecutor().schedule(new Runnable() {
@Override
public void run() {
RBlockingQueueReactive<Integer> queue2 = redisson.getBlockingQueue("queue:pollany1");
RBlockingQueueReactive<Integer> queue3 = redisson.getBlockingQueue("queue:pollany2");
sync(queue3.put(2));
sync(queue1.put(1));
sync(queue2.put(3));
}
Executors.newSingleThreadScheduledExecutor().schedule(() -> {
RBlockingQueueReactive<Integer> queue2 = redisson.getBlockingQueue("queue:pollany1");
RBlockingQueueReactive<Integer> queue3 = redisson.getBlockingQueue("queue:pollany2");
sync(queue3.put(2));
sync(queue1.put(1));
sync(queue2.put(3));
}, 3, TimeUnit.SECONDS);
long s = System.currentTimeMillis();
@ -44,12 +41,9 @@ public class RedissonBlockingQueueReactiveTest extends BaseReactiveTest {
@Test
public void testTake() throws InterruptedException {
RBlockingQueueReactive<Integer> queue1 = redisson.getBlockingQueue("queue:take");
Executors.newSingleThreadScheduledExecutor().schedule(new Runnable() {
@Override
public void run() {
RBlockingQueueReactive<Integer> queue = redisson.getBlockingQueue("queue:take");
sync(queue.put(3));
}
Executors.newSingleThreadScheduledExecutor().schedule(() -> {
RBlockingQueueReactive<Integer> queue = redisson.getBlockingQueue("queue:take");
sync(queue.put(3));
}, 10, TimeUnit.SECONDS);
long s = System.currentTimeMillis();
@ -160,11 +154,8 @@ public class RedissonBlockingQueueReactiveTest extends BaseReactiveTest {
int total = 100;
for (int i = 0; i < total; i++) {
// runnable won't be executed in any particular order, and hence, int value as well.
executor.submit(new Runnable() {
@Override
public void run() {
redisson.getQueue("test_:blocking:queue:").add(counter.incrementAndGet());
}
executor.submit(() -> {
redisson.getQueue("test_:blocking:queue:").add(counter.incrementAndGet());
});
}
int count = 0;

@ -1,7 +1,5 @@
package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
@ -22,12 +20,17 @@ import org.redisson.RedisRunner.RedisProcess;
import org.redisson.core.RBlockingQueue;
import io.netty.util.concurrent.Future;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonBlockingQueueTest extends BaseTest {
@Test
public void testPollWithBrokenConnection() throws IOException, InterruptedException, ExecutionException {
RedisProcess runner = new RedisRunner().port(6319).run();
RedisProcess runner = new RedisRunner()
.port(6319)
.nosave()
.randomDir()
.run();
Config config = new Config();
config.useSingleServer().setAddress("127.0.0.1:6319");
@ -43,7 +46,11 @@ public class RedissonBlockingQueueTest extends BaseTest {
@Test
public void testPollReattach() throws InterruptedException, IOException, ExecutionException, TimeoutException {
RedisProcess runner = new RedisRunner().port(6319).run();
RedisProcess runner = new RedisRunner()
.port(6319)
.nosave()
.randomDir()
.run();
Config config = new Config();
config.useSingleServer().setAddress("127.0.0.1:6319");
@ -54,7 +61,11 @@ public class RedissonBlockingQueueTest extends BaseTest {
f.await(1, TimeUnit.SECONDS);
runner.stop();
runner = new RedisRunner().port(6319).run();
runner = new RedisRunner()
.port(6319)
.nosave()
.randomDir()
.run();
queue1.put(123);
// check connection rotation
@ -71,7 +82,11 @@ public class RedissonBlockingQueueTest extends BaseTest {
@Test
public void testTakeReattach() throws InterruptedException, IOException, ExecutionException, TimeoutException {
RedisProcess runner = new RedisRunner().port(6319).run();
RedisProcess runner = new RedisRunner()
.port(6319)
.nosave()
.randomDir()
.run();
Config config = new Config();
config.useSingleServer().setAddress("127.0.0.1:6319");
@ -81,7 +96,11 @@ public class RedissonBlockingQueueTest extends BaseTest {
f.await(1, TimeUnit.SECONDS);
runner.stop();
runner = new RedisRunner().port(6319).run();
runner = new RedisRunner()
.port(6319)
.nosave()
.randomDir()
.run();
queue1.put(123);
// check connection rotation
@ -135,18 +154,15 @@ public class RedissonBlockingQueueTest extends BaseTest {
@Test
public void testPollFromAny() throws InterruptedException {
final RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("queue:pollany");
Executors.newSingleThreadScheduledExecutor().schedule(new Runnable() {
@Override
public void run() {
RBlockingQueue<Integer> queue2 = redisson.getBlockingQueue("queue:pollany1");
RBlockingQueue<Integer> queue3 = redisson.getBlockingQueue("queue:pollany2");
try {
queue3.put(2);
queue1.put(1);
queue2.put(3);
} catch (InterruptedException e) {
Assert.fail();
}
Executors.newSingleThreadScheduledExecutor().schedule(() -> {
RBlockingQueue<Integer> queue2 = redisson.getBlockingQueue("queue:pollany1");
RBlockingQueue<Integer> queue3 = redisson.getBlockingQueue("queue:pollany2");
try {
queue3.put(2);
queue1.put(1);
queue2.put(3);
} catch (InterruptedException e) {
Assert.fail();
}
}, 3, TimeUnit.SECONDS);
@ -160,16 +176,13 @@ public class RedissonBlockingQueueTest extends BaseTest {
@Test
public void testTake() throws InterruptedException {
RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("queue:take");
Executors.newSingleThreadScheduledExecutor().schedule(new Runnable() {
@Override
public void run() {
RBlockingQueue<Integer> queue = redisson.getBlockingQueue("queue:take");
try {
queue.put(3);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Executors.newSingleThreadScheduledExecutor().schedule(() -> {
RBlockingQueue<Integer> queue = redisson.getBlockingQueue("queue:take");
try {
queue.put(3);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}, 10, TimeUnit.SECONDS);
@ -201,15 +214,12 @@ public class RedissonBlockingQueueTest extends BaseTest {
@Test
public void testPollLastAndOfferFirstTo() throws InterruptedException {
final RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("{queue}1");
Executors.newSingleThreadScheduledExecutor().schedule(new Runnable() {
@Override
public void run() {
try {
queue1.put(3);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Executors.newSingleThreadScheduledExecutor().schedule(() -> {
try {
queue1.put(3);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}, 10, TimeUnit.SECONDS);
@ -321,11 +331,8 @@ public class RedissonBlockingQueueTest extends BaseTest {
int total = 100;
for (int i = 0; i < total; i++) {
// runnable won't be executed in any particular order, and hence, int value as well.
executor.submit(new Runnable() {
@Override
public void run() {
redisson.getQueue("test_:blocking:queue:").add(counter.incrementAndGet());
}
executor.submit(() -> {
redisson.getQueue("test_:blocking:queue:").add(counter.incrementAndGet());
});
}
int count = 0;

@ -3,10 +3,7 @@ package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;

@ -16,13 +16,10 @@ public class RedissonConcurrentMapTest extends BaseConcurrentTest {
ConcurrentMap<String, String> map = BaseTest.createInstance().getMap(name);
map.put("1", "122");
testSingleInstanceConcurrency(100, new RedissonRunnable() {
@Override
public void run(RedissonClient redisson) {
ConcurrentMap<String, String> map = redisson.getMap(name);
map.replace("1", "122", "32");
map.replace("1", "0", "31");
}
testSingleInstanceConcurrency(100, r -> {
ConcurrentMap<String, String> map1 = r.getMap(name);
map1.replace("1", "122", "32");
map1.replace("1", "0", "31");
});
ConcurrentMap<String, String> testMap = BaseTest.createInstance().getMap(name);
@ -37,12 +34,9 @@ public class RedissonConcurrentMapTest extends BaseConcurrentTest {
ConcurrentMap<String, String> map = BaseTest.createInstance().getMap(name);
map.putIfAbsent("1", "0");
testSingleInstanceConcurrency(100, new RedissonRunnable() {
@Override
public void run(RedissonClient redisson) {
ConcurrentMap<String, String> map = redisson.getMap(name);
map.remove("1", "0");
}
testSingleInstanceConcurrency(100, r -> {
ConcurrentMap<String, String> map1 = r.getMap(name);
map1.remove("1", "0");
});
assertMapSize(0, name);
@ -55,12 +49,9 @@ public class RedissonConcurrentMapTest extends BaseConcurrentTest {
ConcurrentMap<String, String> map = BaseTest.createInstance().getMap(name);
map.put("1", "0");
testSingleInstanceConcurrency(100, new RedissonRunnable() {
@Override
public void run(RedissonClient redisson) {
ConcurrentMap<String, String> map = redisson.getMap(name);
map.replace("1", "3");
}
testSingleInstanceConcurrency(100, r -> {
ConcurrentMap<String, String> map1 = r.getMap(name);
map1.replace("1", "3");
});
ConcurrentMap<String, String> testMap = BaseTest.createInstance().getMap(name);
@ -73,19 +64,15 @@ public class RedissonConcurrentMapTest extends BaseConcurrentTest {
public void test_Multi_Replace_MultiInstance() throws InterruptedException {
final String name = "test_Multi_Replace_MultiInstance";
RedissonClient redisson = BaseTest.createInstance();
ConcurrentMap<Integer, Integer> map = redisson.getMap(name);
ConcurrentMap<Integer, Integer> map = BaseTest.createInstance().getMap(name);
for (int i = 0; i < 5; i++) {
map.put(i, 1);
}
final SecureRandom secureRandom = new SecureRandom();
testSingleInstanceConcurrency(100, new RedissonRunnable() {
@Override
public void run(RedissonClient redisson) {
ConcurrentMap<Integer, Integer> map = redisson.getMap(name);
Assert.assertNotNull(map.replace(secureRandom.nextInt(5), 2));
}
testSingleInstanceConcurrency(100, r -> {
ConcurrentMap<Integer, Integer> map1 = r.getMap(name);
Assert.assertNotNull(map1.replace(secureRandom.nextInt(5), 2));
});
ConcurrentMap<Integer, Integer> testMap = BaseTest.createInstance().getMap(name);
@ -94,8 +81,6 @@ public class RedissonConcurrentMapTest extends BaseConcurrentTest {
}
assertMapSize(5, name);
redisson.getKeys().flushdb();
redisson.shutdown();
}
@Test
@ -108,12 +93,9 @@ public class RedissonConcurrentMapTest extends BaseConcurrentTest {
}
final SecureRandom secureRandom = new SecureRandom();
testMultiInstanceConcurrency(100, new RedissonRunnable() {
@Override
public void run(RedissonClient redisson) {
ConcurrentMap<String, String> map = redisson.getMap(name);
map.remove(secureRandom.nextInt(10), 1);
}
testMultiInstanceConcurrency(100, r -> {
ConcurrentMap<String, String> map1 = r.getMap(name);
map1.remove(secureRandom.nextInt(10), 1);
});
assertMapSize(0, name);
@ -125,12 +107,9 @@ public class RedissonConcurrentMapTest extends BaseConcurrentTest {
ConcurrentMap<String, String> map = BaseTest.createInstance().getMap(name);
map.putIfAbsent("1", "0");
testSingleInstanceConcurrency(100, new RedissonRunnable() {
@Override
public void run(RedissonClient redisson) {
ConcurrentMap<String, String> map = redisson.getMap(name);
map.putIfAbsent("1", "1");
}
testSingleInstanceConcurrency(100, r -> {
ConcurrentMap<String, String> map1 = r.getMap(name);
map1.putIfAbsent("1", "1");
});
ConcurrentMap<String, String> testMap = BaseTest.createInstance().getMap(name);
@ -142,12 +121,9 @@ public class RedissonConcurrentMapTest extends BaseConcurrentTest {
@Test
public void testMultiPutIfAbsent_SingleInstance() throws InterruptedException {
final String name = "testMultiPutIfAbsent_SingleInstance";
testSingleInstanceConcurrency(100, new RedissonRunnable() {
@Override
public void run(RedissonClient redisson) {
ConcurrentMap<String, String> map = redisson.getMap(name);
map.putIfAbsent("" + Math.random(), "1");
}
testSingleInstanceConcurrency(100, r -> {
ConcurrentMap<String, String> map = r.getMap(name);
map.putIfAbsent("" + Math.random(), "1");
});
assertMapSize(100, name);
@ -156,12 +132,9 @@ public class RedissonConcurrentMapTest extends BaseConcurrentTest {
@Test
public void testMultiPutIfAbsent_MultiInstance() throws InterruptedException {
final String name = "testMultiPutIfAbsent_MultiInstance";
testMultiInstanceConcurrency(100, new RedissonRunnable() {
@Override
public void run(RedissonClient redisson) {
ConcurrentMap<String, String> map = redisson.getMap(name);
map.putIfAbsent("" + Math.random(), "1");
}
testMultiInstanceConcurrency(100, r -> {
ConcurrentMap<String, String> map = r.getMap(name);
map.putIfAbsent("" + Math.random(), "1");
});
assertMapSize(100, name);
@ -176,12 +149,9 @@ public class RedissonConcurrentMapTest extends BaseConcurrentTest {
@Test
public void testMultiPut_SingleInstance() throws InterruptedException {
final String name = "testMultiPut_SingleInstance";
testSingleInstanceConcurrency(100, new RedissonRunnable() {
@Override
public void run(RedissonClient redisson) {
Map<String, String> map = redisson.getMap(name);
map.put("" + Math.random(), "1");
}
testSingleInstanceConcurrency(100, r -> {
Map<String, String> map = r.getMap(name);
map.put("" + Math.random(), "1");
});
assertMapSize(100, name);
@ -191,12 +161,9 @@ public class RedissonConcurrentMapTest extends BaseConcurrentTest {
@Test
public void testMultiPut_MultiInstance() throws InterruptedException {
final String name = "testMultiPut_MultiInstance";
testMultiInstanceConcurrency(100, new RedissonRunnable() {
@Override
public void run(RedissonClient redisson) {
ConcurrentMap<String, String> map = redisson.getMap(name);
map.putIfAbsent("" + Math.random(), "1");
}
testMultiInstanceConcurrency(100, r -> {
ConcurrentMap<String, String> map = r.getMap(name);
map.putIfAbsent("" + Math.random(), "1");
});
assertMapSize(100, name);

@ -1,15 +1,48 @@
package org.redisson;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.redisson.core.RCountDownLatch;
public class RedissonCountDownLatchConcurrentTest {
@BeforeClass
public static void beforeClass() throws IOException, InterruptedException {
if (!RedissonRuntimeEnvironment.isTravis) {
RedisRunner.startDefaultRedisServerInstance();
}
}
@AfterClass
public static void afterClass() throws IOException, InterruptedException {
if (!RedissonRuntimeEnvironment.isTravis) {
RedisRunner.shutDownDefaultRedisServerInstance();
}
}
@Before
public void before() throws IOException, InterruptedException {
if (RedissonRuntimeEnvironment.isTravis) {
RedisRunner.startDefaultRedisServerInstance();
}
}
@After
public void after() throws InterruptedException {
if (RedissonRuntimeEnvironment.isTravis) {
RedisRunner.shutDownDefaultRedisServerInstance();
}
}
@Test
public void testSingleCountDownAwait_SingleInstance() throws InterruptedException {
@ -22,28 +55,22 @@ public class RedissonCountDownLatchConcurrentTest {
final AtomicInteger counter = new AtomicInteger();
ExecutorService executor = Executors.newScheduledThreadPool(iterations);
for (int i = 0; i < iterations; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
latch.await();
Assert.assertEquals(0, latch.getCount());
Assert.assertEquals(iterations, counter.get());
} catch (InterruptedException e) {
Assert.fail();
}
executor.execute(() -> {
try {
latch.await();
Assert.assertEquals(0, latch.getCount());
Assert.assertEquals(iterations, counter.get());
} catch (InterruptedException e) {
Assert.fail();
}
});
}
ExecutorService countDownExecutor = Executors.newFixedThreadPool(iterations);
for (int i = 0; i < iterations; i++) {
countDownExecutor.execute(new Runnable() {
@Override
public void run() {
latch.countDown();
counter.incrementAndGet();
}
countDownExecutor.execute(() -> {
latch.countDown();
counter.incrementAndGet();
});
}

@ -18,29 +18,23 @@ public class RedissonCountDownLatchTest extends BaseTest {
final RCountDownLatch latch = redisson.getCountDownLatch("latch1");
Assert.assertTrue(latch.trySetCount(1));
executor.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Assert.fail();
}
latch.countDown();
executor.execute(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Assert.fail();
}
latch.countDown();
});
executor.execute(new Runnable() {
@Override
public void run() {
try {
Assert.assertEquals(1, latch.getCount());
boolean res = latch.await(550, TimeUnit.MILLISECONDS);
Assert.assertTrue(res);
} catch (InterruptedException e) {
Assert.fail();
}
executor.execute(() -> {
try {
Assert.assertEquals(1, latch.getCount());
boolean res = latch.await(550, TimeUnit.MILLISECONDS);
Assert.assertTrue(res);
} catch (InterruptedException e) {
Assert.fail();
}
});
@ -56,29 +50,23 @@ public class RedissonCountDownLatchTest extends BaseTest {
final RCountDownLatch latch = redisson.getCountDownLatch("latch1");
Assert.assertTrue(latch.trySetCount(1));
executor.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Assert.fail();
}
latch.countDown();
executor.execute(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Assert.fail();
}
latch.countDown();
});
executor.execute(new Runnable() {
@Override
public void run() {
try {
Assert.assertEquals(1, latch.getCount());
boolean res = latch.await(500, TimeUnit.MILLISECONDS);
Assert.assertFalse(res);
} catch (InterruptedException e) {
Assert.fail();
}
executor.execute(() -> {
try {
Assert.assertEquals(1, latch.getCount());
boolean res = latch.await(500, TimeUnit.MILLISECONDS);
Assert.assertFalse(res);
} catch (InterruptedException e) {
Assert.fail();
}
});

@ -122,14 +122,7 @@ public class RedissonDequeReactiveTest extends BaseReactiveTest {
final RDequeReactive<Integer> queue = redisson.getDeque("deque");
sync(queue.addAll(Arrays.asList(1, 2, 3)));
MatcherAssert.assertThat(new Iterable<Integer>() {
@Override
public Iterator<Integer> iterator() {
return toIterator(queue.descendingIterator());
}
}, Matchers.contains(3, 2, 1));
}
MatcherAssert.assertThat(() -> toIterator(queue.descendingIterator()), Matchers.contains(3, 2, 1));
}
}

@ -1,11 +1,13 @@
package org.redisson;
import java.io.IOException;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
import org.redisson.core.GeoEntry;
@ -15,6 +17,18 @@ import org.redisson.core.RGeo;
public class RedissonGeoTest extends BaseTest {
@BeforeClass
public static void checkRedisVersion() throws IOException, InterruptedException {
boolean running = RedisRunner.isDefaultRedisServerInstanceRunning();
if (!running) {
RedisRunner.startDefaultRedisServerInstance();
}
Assume.assumeTrue(RedisRunner.getDefaultRedisServerInstance().getRedisVersion().compareTo("3.1.0") > 0);
if (!running) {
RedisRunner.shutDownDefaultRedisServerInstance();
}
}
@Test
public void testAdd() {
RGeo<String> geo = redisson.getGeo("test");

@ -35,7 +35,7 @@ public class RedissonKeysReactiveTest extends BaseReactiveTest {
MatcherAssert.assertThat(sync(redisson.getKeys().randomKey()), Matchers.isOneOf("test1", "test2"));
sync(redisson.getKeys().delete("test1"));
Assert.assertEquals(sync(redisson.getKeys().randomKey()), "test2");
Assert.assertEquals("test2", sync(redisson.getKeys().randomKey()));
sync(redisson.getKeys().flushdb());
Assert.assertNull(sync(redisson.getKeys().randomKey()));
}

@ -57,7 +57,7 @@ public class RedissonKeysTest extends BaseTest {
assertThat(redisson.getKeys().randomKey()).isIn("test1", "test2");
redisson.getKeys().delete("test1");
Assert.assertEquals(redisson.getKeys().randomKey(), "test2");
Assert.assertEquals("test2", redisson.getKeys().randomKey());
redisson.getKeys().flushdb();
Assert.assertNull(redisson.getKeys().randomKey());
}

@ -105,7 +105,7 @@ public class RedissonListMultimapCacheTest extends BaseTest {
Thread.sleep(1000);
assertThat(multimap.get("1").size()).isZero();
assertThat(multimap.get("1")).contains();
assertThat(multimap.get("1").isEmpty()).isTrue();
assertThat(multimap.get("1").remove("3")).isFalse();
assertThat(multimap.get("1").contains("3")).isFalse();
assertThat(multimap.get("1").retainAll(Arrays.asList("1"))).isFalse();

@ -2,9 +2,7 @@ package org.redisson;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.hamcrest.MatcherAssert;
@ -13,7 +11,6 @@ import org.junit.Assert;
import org.junit.Test;
import org.redisson.api.RListReactive;
import org.redisson.client.RedisException;
import org.redisson.core.RMap;
import reactor.rx.Promise;

@ -56,13 +56,10 @@ public class RedissonLockTest extends BaseConcurrentTest {
@Test
public void testAutoExpire() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
testSingleInstanceConcurrency(1, new RedissonRunnable() {
@Override
public void run(RedissonClient redisson) {
RLock lock = redisson.getLock("lock");
lock.lock();
latch.countDown();
}
testSingleInstanceConcurrency(1, r -> {
RLock lock = r.getLock("lock");
lock.lock();
latch.countDown();
});
Assert.assertTrue(latch.await(1, TimeUnit.SECONDS));
@ -230,14 +227,11 @@ public class RedissonLockTest extends BaseConcurrentTest {
final AtomicInteger lockedCounter = new AtomicInteger();
int iterations = 15;
testSingleInstanceConcurrency(iterations, new RedissonRunnable() {
@Override
public void run(RedissonClient redisson) {
Lock lock = redisson.getLock("testConcurrency_SingleInstance");
lock.lock();
lockedCounter.incrementAndGet();
lock.unlock();
}
testSingleInstanceConcurrency(iterations, r -> {
Lock lock = r.getLock("testConcurrency_SingleInstance");
lock.lock();
lockedCounter.incrementAndGet();
lock.unlock();
});
Assert.assertEquals(iterations, lockedCounter.get());
@ -248,19 +242,16 @@ public class RedissonLockTest extends BaseConcurrentTest {
final int iterations = 100;
final AtomicInteger lockedCounter = new AtomicInteger();
testMultiInstanceConcurrency(16, new RedissonRunnable() {
@Override
public void run(RedissonClient redisson) {
for (int i = 0; i < iterations; i++) {
redisson.getLock("testConcurrency_MultiInstance1").lock();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
lockedCounter.incrementAndGet();
redisson.getLock("testConcurrency_MultiInstance1").unlock();
testMultiInstanceConcurrency(16, r -> {
for (int i = 0; i < iterations; i++) {
r.getLock("testConcurrency_MultiInstance1").lock();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
lockedCounter.incrementAndGet();
r.getLock("testConcurrency_MultiInstance1").unlock();
}
});
@ -272,14 +263,11 @@ public class RedissonLockTest extends BaseConcurrentTest {
int iterations = 100;
final AtomicInteger lockedCounter = new AtomicInteger();
testMultiInstanceConcurrency(iterations, new RedissonRunnable() {
@Override
public void run(RedissonClient redisson) {
Lock lock = redisson.getLock("testConcurrency_MultiInstance2");
lock.lock();
lockedCounter.incrementAndGet();
lock.unlock();
}
testMultiInstanceConcurrency(iterations, r -> {
Lock lock = r.getLock("testConcurrency_MultiInstance2");
lock.lock();
lockedCounter.incrementAndGet();
lock.unlock();
});
Assert.assertEquals(iterations, lockedCounter.get());

@ -11,21 +11,12 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.redisson.RedissonMapCacheTest.SimpleKey;
import org.redisson.RedissonMapCacheTest.SimpleValue;
import org.redisson.api.RMapReactive;
import org.redisson.codec.MsgPackJacksonCodec;
import org.redisson.core.RMap;
import org.redisson.core.RMapCache;
import reactor.rx.Streams;
public class RedissonMapReactiveTest extends BaseReactiveTest {

@ -14,6 +14,8 @@ import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.client.codec.StringCodec;
@ -192,12 +194,7 @@ public class RedissonMapTest extends BaseTest {
map.put(3, 300);
map.put(4, 400);
Map<Integer, Integer> filtered = map.filterKeys(new Predicate<Integer>() {
@Override
public boolean apply(Integer input) {
return input >= 2 && input <= 3;
}
});
Map<Integer, Integer> filtered = map.filterKeys(input -> input >= 2 && input <= 3);
Map<Integer, Integer> expectedMap = new HashMap<Integer, Integer>();
expectedMap.put(2, 200);

@ -1,8 +1,5 @@
package org.redisson;
import static com.jayway.awaitility.Awaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -13,6 +10,8 @@ import org.redisson.core.RedissonMultiLock;
import io.netty.channel.nio.NioEventLoopGroup;
import org.redisson.RedisRunner.RedisProcess;
import static com.jayway.awaitility.Awaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonMultiLockTest {
@ -110,18 +109,24 @@ public class RedissonMultiLockTest {
private RedisProcess redisTestMultilockInstance1() throws IOException, InterruptedException {
return new RedisRunner()
.nosave()
.randomDir()
.port(6320)
.run();
}
private RedisProcess redisTestMultilockInstance2() throws IOException, InterruptedException {
return new RedisRunner()
.nosave()
.randomDir()
.port(6321)
.run();
}
private RedisProcess redisTestMultilockInstance3() throws IOException, InterruptedException {
return new RedisRunner()
.nosave()
.randomDir()
.port(6322)
.run();
}

@ -190,13 +190,10 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest {
@Test
public void testAutoExpire() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
testSingleInstanceConcurrency(1, new RedissonRunnable() {
@Override
public void run(RedissonClient redisson) {
RReadWriteLock lock1 = redisson.getReadWriteLock("lock");
lock1.writeLock().lock();
latch.countDown();
}
testSingleInstanceConcurrency(1, r -> {
RReadWriteLock lock1 = r.getReadWriteLock("lock");
lock1.writeLock().lock();
latch.countDown();
});
Assert.assertTrue(latch.await(1, TimeUnit.SECONDS));
@ -372,21 +369,17 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest {
final Random r = new SecureRandom();
int iterations = 15;
testSingleInstanceConcurrency(iterations, new RedissonRunnable() {
@Override
public void run(RedissonClient redisson) {
RReadWriteLock rwlock = redisson.getReadWriteLock("testConcurrency_SingleInstance");
RLock lock;
if (r.nextBoolean()) {
lock = rwlock.writeLock();
} else {
lock = rwlock.readLock();
}
lock.lock();
lockedCounter.incrementAndGet();
lock.unlock();
testSingleInstanceConcurrency(iterations, rc -> {
RReadWriteLock rwlock = rc.getReadWriteLock("testConcurrency_SingleInstance");
RLock lock;
if (r.nextBoolean()) {
lock = rwlock.writeLock();
} else {
lock = rwlock.readLock();
}
lock.lock();
lockedCounter.incrementAndGet();
lock.unlock();
});
Assert.assertEquals(iterations, lockedCounter.get());
@ -398,35 +391,30 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest {
final AtomicInteger lockedCounter = new AtomicInteger();
final Random r = new SecureRandom();
testMultiInstanceConcurrency(16, new RedissonRunnable() {
@Override
public void run(RedissonClient redisson) {
for (int i = 0; i < iterations; i++) {
boolean useWriteLock = r.nextBoolean();
RReadWriteLock rwlock = redisson.getReadWriteLock("testConcurrency_MultiInstance1");
RLock lock;
if (useWriteLock) {
lock = rwlock.writeLock();
} else {
lock = rwlock.readLock();
}
lock.lock();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
lockedCounter.incrementAndGet();
rwlock = redisson.getReadWriteLock("testConcurrency_MultiInstance1");
if (useWriteLock) {
lock = rwlock.writeLock();
} else {
lock = rwlock.readLock();
}
lock.unlock();
testMultiInstanceConcurrency(16, rc -> {
for (int i = 0; i < iterations; i++) {
boolean useWriteLock = r.nextBoolean();
RReadWriteLock rwlock = rc.getReadWriteLock("testConcurrency_MultiInstance1");
RLock lock;
if (useWriteLock) {
lock = rwlock.writeLock();
} else {
lock = rwlock.readLock();
}
lock.lock();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
lockedCounter.incrementAndGet();
rwlock = rc.getReadWriteLock("testConcurrency_MultiInstance1");
if (useWriteLock) {
lock = rwlock.writeLock();
} else {
lock = rwlock.readLock();
}
lock.unlock();
}
});
@ -439,21 +427,17 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest {
final AtomicInteger lockedCounter = new AtomicInteger();
final Random r = new SecureRandom();
testMultiInstanceConcurrency(iterations, new RedissonRunnable() {
@Override
public void run(RedissonClient redisson) {
RReadWriteLock rwlock = redisson.getReadWriteLock("testConcurrency_MultiInstance2");
RLock lock;
if (r.nextBoolean()) {
lock = rwlock.writeLock();
} else {
lock = rwlock.readLock();
}
lock.lock();
lockedCounter.incrementAndGet();
lock.unlock();
testMultiInstanceConcurrency(iterations, rc -> {
RReadWriteLock rwlock = rc.getReadWriteLock("testConcurrency_MultiInstance2");
RLock lock;
if (r.nextBoolean()) {
lock = rwlock.writeLock();
} else {
lock = rwlock.readLock();
}
lock.lock();
lockedCounter.incrementAndGet();
lock.unlock();
});
Assert.assertEquals(iterations, lockedCounter.get());

@ -0,0 +1,23 @@
package org.redisson;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
public class RedissonRuntimeEnvironment {
public static final boolean isTravis = "true".equalsIgnoreCase(System.getProperty("travisEnv"));
public static final String redisBinaryPath = System.getProperty("redisBinary", "C:\\Devel\\projects\\redis\\Redis-x64-3.0.500\\redis-server.exe");
public static final String tempDir = System.getProperty("java.io.tmpdir");
public static final String OS;
public static final boolean isWindows;
static {
OS = System.getProperty("os.name", "generic");
isWindows = OS.toLowerCase(Locale.ENGLISH).contains("win");
}
}

@ -12,7 +12,6 @@ import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.api.RScoredSortedSetReactive;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.ScoredEntry;

@ -22,6 +22,7 @@ import org.redisson.core.RScoredSortedSet;
import org.redisson.core.RSortedSet;
import io.netty.util.concurrent.Future;
import org.junit.Assume;
public class RedissonScoredSortedSetTest extends BaseTest {
@ -64,6 +65,7 @@ public class RedissonScoredSortedSetTest extends BaseTest {
@Test
public void testTryAdd() {
Assume.assumeTrue(RedisRunner.getDefaultRedisServerInstance().getRedisVersion().compareTo("3.0.2") >= 0);
RScoredSortedSet<String> set = redisson.getScoredSortedSet("simple");
assertThat(set.tryAdd(123.81, "1980")).isTrue();

@ -3,10 +3,12 @@ package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.Assume;
import org.junit.Test;
import org.redisson.core.RSemaphore;
@ -154,20 +156,17 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest {
s.setPermits(1);
int iterations = 15;
testSingleInstanceConcurrency(iterations, new RedissonRunnable() {
@Override
public void run(RedissonClient redisson) {
RSemaphore s = redisson.getSemaphore("test");
try {
s.acquire();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
int value = lockedCounter.get();
lockedCounter.set(value + 1);
s.release();
testSingleInstanceConcurrency(iterations, r -> {
RSemaphore s1 = r.getSemaphore("test");
try {
s1.acquire();
}catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
int value = lockedCounter.get();
lockedCounter.set(value + 1);
s1.release();
});
assertThat(lockedCounter.get()).isEqualTo(iterations);
@ -181,25 +180,22 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest {
RSemaphore s = redisson.getSemaphore("test");
s.setPermits(1);
testMultiInstanceConcurrency(16, new RedissonRunnable() {
@Override
public void run(RedissonClient redisson) {
for (int i = 0; i < iterations; i++) {
try {
redisson.getSemaphore("test").acquire();
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
int value = lockedCounter.get();
lockedCounter.set(value + 1);
redisson.getSemaphore("test").release();
testMultiInstanceConcurrency(16, r -> {
for (int i = 0; i < iterations; i++) {
try {
r.getSemaphore("test").acquire();
}catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
int value = lockedCounter.get();
lockedCounter.set(value + 1);
r.getSemaphore("test").release();
}
});
@ -214,20 +210,17 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest {
RSemaphore s = redisson.getSemaphore("test");
s.setPermits(1);
testMultiInstanceConcurrency(iterations, new RedissonRunnable() {
@Override
public void run(RedissonClient redisson) {
RSemaphore s = redisson.getSemaphore("test");
try {
s.acquire();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
int value = lockedCounter.get();
lockedCounter.set(value + 1);
s.release();
testMultiInstanceConcurrency(iterations, r -> {
RSemaphore s1 = r.getSemaphore("test");
try {
s1.acquire();
}catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
int value = lockedCounter.get();
lockedCounter.set(value + 1);
s1.release();
});
assertThat(lockedCounter.get()).isEqualTo(iterations);
@ -235,6 +228,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest {
@Test
public void testConcurrency_MultiInstance_10_permits() throws InterruptedException {
Assume.assumeFalse(Boolean.valueOf(System.getProperty("travisEnv")));
int iterations = 100;
final AtomicInteger lockedCounter = new AtomicInteger();
@ -243,33 +237,27 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest {
final AtomicInteger checkPermits = new AtomicInteger(s.availablePermits());
final CyclicBarrier barrier = new CyclicBarrier(s.availablePermits());
testMultiInstanceConcurrency(iterations, new RedissonRunnable() {
@Override
public void run(RedissonClient redisson) {
RSemaphore s = redisson.getSemaphore("test");
try {
s.acquire();
barrier.await();
if (checkPermits.decrementAndGet() > 0) {
assertThat(s.availablePermits()).isEqualTo(0);
assertThat(s.tryAcquire()).isFalse();
} else {
Thread.sleep(50);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
testMultiInstanceConcurrencySequentiallyLaunched(iterations, r -> {
RSemaphore s1 = r.getSemaphore("test");
try {
s1.acquire();
barrier.await();
if (checkPermits.decrementAndGet() > 0) {
assertThat(s1.availablePermits()).isEqualTo(0);
assertThat(s1.tryAcquire()).isFalse();
} else {
Thread.sleep(50);
}
int value = lockedCounter.get();
lockedCounter.set(value + 1);
s.release();
}catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
int value = lockedCounter.get();
lockedCounter.set(value + 1);
s1.release();
});
System.out.println(lockedCounter.get());

@ -16,11 +16,7 @@ import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.api.RSetCacheReactive;
import org.redisson.api.RSetReactive;
import org.redisson.codec.MsgPackJacksonCodec;
import org.redisson.core.RSetCache;
import reactor.rx.Streams;
public class RedissonSetCacheReactiveTest extends BaseReactiveTest {

@ -105,7 +105,7 @@ public class RedissonSetMultimapCacheTest extends BaseTest {
Thread.sleep(1500);
assertThat(multimap.get("1").size()).isZero();
assertThat(multimap.get("1")).contains();
assertThat(multimap.get("1").isEmpty()).isTrue();
assertThat(multimap.get("1").remove("3")).isFalse();
assertThat(multimap.get("1").contains("3")).isFalse();
assertThat(multimap.get("1").retainAll(Arrays.asList("1"))).isFalse();

@ -33,18 +33,6 @@ public class RedissonSetTest extends BaseTest {
}
@Test
public void testRemoveAll() {
RSet<Integer> set = redisson.getSet("set");
set.add(1);
set.add(2);
set.add(3);
assertThat(set.removeAll(Arrays.asList(1, 3))).isTrue();
assertThat(set.removeAll(Arrays.asList(1, 3))).isFalse();
assertThat(set).containsOnly(2);
}
@Test
public void testRemoveRandom() {
RSet<Integer> set = redisson.getSet("simple");
@ -438,4 +426,41 @@ public class RedissonSetTest extends BaseTest {
Assert.assertEquals(1, set.size());
Assert.assertEquals(0, otherSet.size());
}
@Test
public void testRemoveAllEmpty() {
Set<Integer> list = redisson.getSet("list");
list.add(1);
list.add(2);
list.add(3);
list.add(4);
list.add(5);
Assert.assertFalse(list.removeAll(Collections.emptyList()));
Assert.assertFalse(Arrays.asList(1).removeAll(Collections.emptyList()));
}
@Test
public void testRemoveAll() {
Set<Integer> list = redisson.getSet("list");
list.add(1);
list.add(2);
list.add(3);
list.add(4);
list.add(5);
Assert.assertFalse(list.removeAll(Collections.emptyList()));
Assert.assertTrue(list.removeAll(Arrays.asList(3, 2, 10, 6)));
assertThat(list).containsExactly(1, 4, 5);
Assert.assertTrue(list.removeAll(Arrays.asList(4)));
assertThat(list).containsExactly(1, 5);
Assert.assertTrue(list.removeAll(Arrays.asList(1, 5, 1, 5)));
Assert.assertTrue(list.isEmpty());
}
}

@ -1,8 +1,5 @@
package org.redisson;
import static com.jayway.awaitility.Awaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Iterator;
@ -10,9 +7,13 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.redisson.RedisRunner.RedisProcess;
import org.redisson.client.RedisConnectionException;
@ -23,11 +24,52 @@ import org.redisson.connection.ConnectionListener;
import org.redisson.core.ClusterNode;
import org.redisson.core.Node;
import org.redisson.core.NodesGroup;
import static com.jayway.awaitility.Awaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import static org.redisson.BaseTest.createInstance;
public class RedissonTest {
RedissonClient redisson;
protected RedissonClient redisson;
protected static RedissonClient defaultRedisson;
@BeforeClass
public static void beforeClass() throws IOException, InterruptedException {
if (!RedissonRuntimeEnvironment.isTravis) {
RedisRunner.startDefaultRedisServerInstance();
defaultRedisson = BaseTest.createInstance();
}
}
@AfterClass
public static void afterClass() throws IOException, InterruptedException {
if (!RedissonRuntimeEnvironment.isTravis) {
RedisRunner.shutDownDefaultRedisServerInstance();
defaultRedisson.shutdown();
}
}
@Before
public void before() throws IOException, InterruptedException {
if (RedissonRuntimeEnvironment.isTravis) {
RedisRunner.startDefaultRedisServerInstance();
redisson = createInstance();
} else {
if (redisson == null) {
redisson = defaultRedisson;
}
redisson.getKeys().flushall();
}
}
@After
public void after() throws InterruptedException {
if (RedissonRuntimeEnvironment.isTravis) {
redisson.shutdown();
RedisRunner.shutDownDefaultRedisServerInstance();
}
}
public static class Dummy {
private String field;
}
@ -167,9 +209,9 @@ public class RedissonTest {
NodesGroup<Node> nodes = redisson.getNodesGroup();
Assert.assertEquals(5, nodes.getNodes().size());
for (Node node : nodes.getNodes()) {
nodes.getNodes().stream().forEach((node) -> {
Assert.assertTrue(node.ping());
}
});
Assert.assertTrue(nodes.pingAll());
}
@ -207,11 +249,11 @@ public class RedissonTest {
NodesGroup<ClusterNode> nodes = redisson.getClusterNodesGroup();
Assert.assertEquals(2, nodes.getNodes().size());
for (ClusterNode node : nodes.getNodes()) {
nodes.getNodes().stream().forEach((node) -> {
Map<String, String> params = node.info();
Assert.assertNotNull(params);
Assert.assertTrue(node.ping());
}
});
Assert.assertTrue(nodes.pingAll());
}
@ -276,12 +318,16 @@ public class RedissonTest {
private RedisProcess redisTestSmallMemory() throws IOException, InterruptedException {
return new RedisRunner()
.maxmemory("1mb")
.nosave()
.randomDir()
.port(6319)
.run();
}
private RedisProcess redisTestConnection() throws IOException, InterruptedException {
return new RedisRunner()
.nosave()
.randomDir()
.port(6319)
.run();
}

@ -0,0 +1,100 @@
package org.redisson;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.IntStream;
import org.junit.runner.Description;
import org.junit.runner.Result;
import org.junit.runner.notification.Failure;
import org.junit.runner.notification.RunListener;
public class RedissonTestRunListener extends RunListener {
private final AtomicBoolean running = new AtomicBoolean(Boolean.TRUE);
@Override
public void testRunStarted(Description description) throws Exception {
super.testRunStarted(description);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
running.set(Boolean.FALSE);
}));
new Thread(() -> {
final RuntimeMXBean runtimeBean = ManagementFactory.getRuntimeMXBean();
final AtomicLong u = new AtomicLong(runtimeBean.getUptime());
while (running.get()) {
try {
long upTime = runtimeBean.getUptime();
if (upTime >= u.get() + 10000) {
u.set(upTime);
System.out.printf("Test Up Time = %.3f (s)%n", upTime / 1000d);
System.out.printf("Heap Usage = %.3f (MB)%n", ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed() / 1024d / 1024d);
System.out.printf("None Heap Usage = %.3f (MB)%n", ManagementFactory.getMemoryMXBean().getNonHeapMemoryUsage().getUsed() / 1024d / 1024d);
System.out.println("=============================");
}
Thread.currentThread().sleep(10000l);
} catch (InterruptedException ex) {
Logger.getLogger(RedissonTestRunListener.class.getName()).log(Level.SEVERE, null, ex);
}
}
}).start();
}
@Override
public void testRunFinished(Result result) throws Exception {
super.testRunFinished(result);
running.set(Boolean.FALSE);
}
@Override
public void testStarted(Description d) throws Exception {
super.testStarted(d);
printTestName("Started", d.getDisplayName(), '*');
}
@Override
public void testFinished(Description d) throws Exception {
super.testFinished(d);
printTestName("Finished", d.getDisplayName());
}
@Override
public void testIgnored(Description d) throws Exception {
super.testIgnored(d);
printTestName("Ignored", d.getDisplayName());
}
@Override
public void testFailure(Failure f) throws Exception {
super.testFailure(f);
printTestName("Failed", f.getTestHeader());
}
@Override
public void testAssumptionFailure(Failure f) {
super.testAssumptionFailure(f);
printTestName("Assumption Failed", f.getTestHeader());
}
private static void printTestName(String action, String test) {
printTestName(action, test, '=');
}
private static void printTestName(String action, String test, char c) {
int dividers = 16 + action.length() + test.length();
aBeautifulDivider(dividers, c);
System.out.println(" " + action + " Test: " + test);
aBeautifulDivider(dividers, c);
}
private static void aBeautifulDivider(int times, char c) {
System.out.println("");
IntStream.iterate(0, n -> n++)
.limit(times)
.forEach((i) -> System.out.print(c));
System.out.println("\n");
}
}

@ -1,9 +1,14 @@
package org.redisson;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.redisson.core.BasePatternStatusListener;
import org.redisson.core.MessageListener;
@ -13,6 +18,34 @@ import org.redisson.core.RTopic;
public class RedissonTopicPatternTest {
@BeforeClass
public static void beforeClass() throws IOException, InterruptedException {
if (!RedissonRuntimeEnvironment.isTravis) {
RedisRunner.startDefaultRedisServerInstance();
}
}
@AfterClass
public static void afterClass() throws IOException, InterruptedException {
if (!RedissonRuntimeEnvironment.isTravis) {
RedisRunner.shutDownDefaultRedisServerInstance();
}
}
@Before
public void before() throws IOException, InterruptedException {
if (RedissonRuntimeEnvironment.isTravis) {
RedisRunner.startDefaultRedisServerInstance();
}
}
@After
public void after() throws InterruptedException {
if (RedissonRuntimeEnvironment.isTravis) {
RedisRunner.shutDownDefaultRedisServerInstance();
}
}
public static class Message {
private String name;
@ -51,26 +84,20 @@ public class RedissonTopicPatternTest {
}
}
@Test
@Test(timeout = 300 * 1000)
public void testUnsubscribe() throws InterruptedException {
final CountDownLatch messageRecieved = new CountDownLatch(1);
RedissonClient redisson = BaseTest.createInstance();
RPatternTopic<Message> topic1 = redisson.getPatternTopic("topic1.*");
int listenerId = topic1.addListener(new PatternMessageListener<Message>() {
@Override
public void onMessage(String pattern, String channel, Message msg) {
Assert.fail();
}
int listenerId = topic1.addListener((pattern, channel, msg) -> {
Assert.fail();
});
topic1.addListener(new PatternMessageListener<Message>() {
@Override
public void onMessage(String pattern, String channel, Message msg) {
Assert.assertEquals("topic1.*", pattern);
Assert.assertEquals("topic1.t3", channel);
Assert.assertEquals(new Message("123"), msg);
messageRecieved.countDown();
}
topic1.addListener((pattern, channel, msg) -> {
Assert.assertEquals("topic1.*", pattern);
Assert.assertEquals("topic1.t3", channel);
Assert.assertEquals(new Message("123"), msg);
messageRecieved.countDown();
});
topic1.removeListener(listenerId);
@ -82,17 +109,14 @@ public class RedissonTopicPatternTest {
redisson.shutdown();
}
@Test
@Test(timeout = 300 * 1000)
public void testLazyUnsubscribe() throws InterruptedException {
final CountDownLatch messageRecieved = new CountDownLatch(1);
RedissonClient redisson1 = BaseTest.createInstance();
RPatternTopic<Message> topic1 = redisson1.getPatternTopic("topic.*");
int listenerId = topic1.addListener(new PatternMessageListener<Message>() {
@Override
public void onMessage(String pattern, String channel, Message msg) {
Assert.fail();
}
int listenerId = topic1.addListener((pattern, channel, msg) -> {
Assert.fail();
});
Thread.sleep(1000);
@ -101,14 +125,11 @@ public class RedissonTopicPatternTest {
RedissonClient redisson2 = BaseTest.createInstance();
RPatternTopic<Message> topic2 = redisson2.getPatternTopic("topic.*");
topic2.addListener(new PatternMessageListener<Message>() {
@Override
public void onMessage(String pattern, String channel, Message msg) {
Assert.assertEquals("topic.*", pattern);
Assert.assertEquals("topic.t1", channel);
Assert.assertEquals(new Message("123"), msg);
messageRecieved.countDown();
}
topic2.addListener((pattern, channel, msg) -> {
Assert.assertEquals("topic.*", pattern);
Assert.assertEquals("topic.t1", channel);
Assert.assertEquals(new Message("123"), msg);
messageRecieved.countDown();
});
RTopic<Message> topic3 = redisson2.getTopic("topic.t1");
@ -120,7 +141,7 @@ public class RedissonTopicPatternTest {
redisson2.shutdown();
}
@Test
@Test(timeout = 600 * 1000)
public void test() throws InterruptedException {
final CountDownLatch messageRecieved = new CountDownLatch(5);
@ -134,22 +155,16 @@ public class RedissonTopicPatternTest {
statusRecieved.countDown();
}
});
topic1.addListener(new PatternMessageListener<Message>() {
@Override
public void onMessage(String pattern, String channel, Message msg) {
Assert.assertEquals(new Message("123"), msg);
messageRecieved.countDown();
}
topic1.addListener((pattern, channel, msg) -> {
Assert.assertEquals(new Message("123"), msg);
messageRecieved.countDown();
});
RedissonClient redisson2 = BaseTest.createInstance();
RTopic<Message> topic2 = redisson2.getTopic("topic.t1");
topic2.addListener(new MessageListener<Message>() {
@Override
public void onMessage(String channel, Message msg) {
Assert.assertEquals(new Message("123"), msg);
messageRecieved.countDown();
}
topic2.addListener((channel, msg) -> {
Assert.assertEquals(new Message("123"), msg);
messageRecieved.countDown();
});
topic2.publish(new Message("123"));
topic2.publish(new Message("123"));
@ -169,7 +184,7 @@ public class RedissonTopicPatternTest {
redisson2.shutdown();
}
@Test
@Test(timeout = 300 * 1000)
public void testListenerRemove() throws InterruptedException {
RedissonClient redisson1 = BaseTest.createInstance();
RPatternTopic<Message> topic1 = redisson1.getPatternTopic("topic.*");
@ -181,11 +196,8 @@ public class RedissonTopicPatternTest {
l.countDown();
}
});
int id = topic1.addListener(new PatternMessageListener<Message>() {
@Override
public void onMessage(String pattern, String channel, Message msg) {
Assert.fail();
}
int id = topic1.addListener((pattern, channel, msg) -> {
Assert.fail();
});
RedissonClient redisson2 = BaseTest.createInstance();

@ -1,10 +1,15 @@
package org.redisson;
import java.io.IOException;
import java.io.Serializable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.redisson.core.BaseStatusListener;
import org.redisson.core.MessageListener;
@ -13,6 +18,34 @@ import org.redisson.core.RTopic;
public class RedissonTopicTest {
@BeforeClass
public static void beforeClass() throws IOException, InterruptedException {
if (!RedissonRuntimeEnvironment.isTravis) {
RedisRunner.startDefaultRedisServerInstance();
}
}
@AfterClass
public static void afterClass() throws IOException, InterruptedException {
if (!RedissonRuntimeEnvironment.isTravis) {
RedisRunner.shutDownDefaultRedisServerInstance();
}
}
@Before
public void before() throws IOException, InterruptedException {
if (RedissonRuntimeEnvironment.isTravis) {
RedisRunner.startDefaultRedisServerInstance();
}
}
@After
public void after() throws InterruptedException {
if (RedissonRuntimeEnvironment.isTravis) {
RedisRunner.shutDownDefaultRedisServerInstance();
}
}
public static class Message implements Serializable {
private String name;
@ -47,21 +80,17 @@ public class RedissonTopicTest {
}
@Test
@Test(timeout = 300 * 1000)
public void testSyncCommands() throws InterruptedException {
RedissonClient redisson = BaseTest.createInstance();
RTopic<String> topic = redisson.getTopic("system_bus");
RSet<String> redissonSet = redisson.getSet("set1");
CountDownLatch latch = new CountDownLatch(1);
topic.addListener(new MessageListener<String>() {
@Override
public void onMessage(String channel, String msg) {
for (int j = 0; j < 1000; j++) {
redissonSet.contains("" + j);
}
latch.countDown();
topic.addListener((channel, msg) -> {
for (int j = 0; j < 1000; j++) {
redissonSet.contains("" + j);
}
latch.countDown();
});
topic.publish("sometext");
@ -70,31 +99,25 @@ public class RedissonTopicTest {
redisson.shutdown();
}
@Test
@Test(timeout = 300 * 1000)
public void testInnerPublish() throws InterruptedException {
RedissonClient redisson1 = BaseTest.createInstance();
final RTopic<Message> topic1 = redisson1.getTopic("topic1");
final CountDownLatch messageRecieved = new CountDownLatch(3);
int listenerId = topic1.addListener(new MessageListener<Message>() {
@Override
public void onMessage(String channel, Message msg) {
Assert.assertEquals(msg, new Message("test"));
messageRecieved.countDown();
}
int listenerId = topic1.addListener((channel, msg) -> {
Assert.assertEquals(msg, new Message("test"));
messageRecieved.countDown();
});
RedissonClient redisson2 = BaseTest.createInstance();
final RTopic<Message> topic2 = redisson2.getTopic("topic2");
topic2.addListener(new MessageListener<Message>() {
@Override
public void onMessage(String channel, Message msg) {
messageRecieved.countDown();
Message m = new Message("test");
if (!msg.equals(m)) {
topic1.publish(m);
topic2.publish(m);
}
topic2.addListener((channel, msg) -> {
messageRecieved.countDown();
Message m = new Message("test");
if (!msg.equals(m)) {
topic1.publish(m);
topic2.publish(m);
}
});
topic2.publish(new Message("123"));
@ -105,7 +128,7 @@ public class RedissonTopicTest {
redisson2.shutdown();
}
@Test
@Test(timeout = 300 * 1000)
public void testStatus() throws InterruptedException {
RedissonClient redisson = BaseTest.createInstance();
final RTopic<Message> topic1 = redisson.getTopic("topic1");
@ -133,25 +156,19 @@ public class RedissonTopicTest {
Assert.assertTrue(l.await(5, TimeUnit.SECONDS));
}
@Test
@Test(timeout = 300 * 1000)
public void testUnsubscribe() throws InterruptedException {
final CountDownLatch messageRecieved = new CountDownLatch(1);
RedissonClient redisson = BaseTest.createInstance();
RTopic<Message> topic1 = redisson.getTopic("topic1");
int listenerId = topic1.addListener(new MessageListener<Message>() {
@Override
public void onMessage(String channel, Message msg) {
Assert.fail();
}
int listenerId = topic1.addListener((channel, msg) -> {
Assert.fail();
});
topic1.addListener(new MessageListener<Message>() {
@Override
public void onMessage(String channel, Message msg) {
Assert.assertEquals("topic1", channel);
Assert.assertEquals(new Message("123"), msg);
messageRecieved.countDown();
}
topic1.addListener((channel, msg) -> {
Assert.assertEquals("topic1", channel);
Assert.assertEquals(new Message("123"), msg);
messageRecieved.countDown();
});
topic1.removeListener(listenerId);
@ -164,17 +181,14 @@ public class RedissonTopicTest {
}
@Test
@Test(timeout = 300 * 1000)
public void testLazyUnsubscribe() throws InterruptedException {
final CountDownLatch messageRecieved = new CountDownLatch(1);
RedissonClient redisson1 = BaseTest.createInstance();
RTopic<Message> topic1 = redisson1.getTopic("topic");
int listenerId = topic1.addListener(new MessageListener<Message>() {
@Override
public void onMessage(String channel, Message msg) {
Assert.fail();
}
int listenerId = topic1.addListener((channel, msg) -> {
Assert.fail();
});
Thread.sleep(1000);
topic1.removeListener(listenerId);
@ -182,12 +196,9 @@ public class RedissonTopicTest {
RedissonClient redisson2 = BaseTest.createInstance();
RTopic<Message> topic2 = redisson2.getTopic("topic");
topic2.addListener(new MessageListener<Message>() {
@Override
public void onMessage(String channel, Message msg) {
Assert.assertEquals(new Message("123"), msg);
messageRecieved.countDown();
}
topic2.addListener((channel, msg) -> {
Assert.assertEquals(new Message("123"), msg);
messageRecieved.countDown();
});
topic2.publish(new Message("123"));
@ -198,28 +209,22 @@ public class RedissonTopicTest {
}
@Test
@Test(timeout = 300 * 1000)
public void test() throws InterruptedException {
final CountDownLatch messageRecieved = new CountDownLatch(2);
RedissonClient redisson1 = BaseTest.createInstance();
RTopic<Message> topic1 = redisson1.getTopic("topic");
topic1.addListener(new MessageListener<Message>() {
@Override
public void onMessage(String channel, Message msg) {
Assert.assertEquals(new Message("123"), msg);
messageRecieved.countDown();
}
topic1.addListener((channel, msg) -> {
Assert.assertEquals(new Message("123"), msg);
messageRecieved.countDown();
});
RedissonClient redisson2 = BaseTest.createInstance();
RTopic<Message> topic2 = redisson2.getTopic("topic");
topic2.addListener(new MessageListener<Message>() {
@Override
public void onMessage(String channel, Message msg) {
Assert.assertEquals(new Message("123"), msg);
messageRecieved.countDown();
}
topic2.addListener((channel, msg) -> {
Assert.assertEquals(new Message("123"), msg);
messageRecieved.countDown();
});
topic2.publish(new Message("123"));
@ -231,29 +236,23 @@ public class RedissonTopicTest {
volatile long counter;
@Test
@Test(timeout = 600 * 1000)
public void testHeavyLoad() throws InterruptedException {
final CountDownLatch messageRecieved = new CountDownLatch(1000);
RedissonClient redisson1 = BaseTest.createInstance();
RTopic<Message> topic1 = redisson1.getTopic("topic");
topic1.addListener(new MessageListener<Message>() {
@Override
public void onMessage(String channel, Message msg) {
Assert.assertEquals(new Message("123"), msg);
messageRecieved.countDown();
counter++;
}
topic1.addListener((channel, msg) -> {
Assert.assertEquals(new Message("123"), msg);
messageRecieved.countDown();
counter++;
});
RedissonClient redisson2 = BaseTest.createInstance();
RTopic<Message> topic2 = redisson2.getTopic("topic");
topic2.addListener(new MessageListener<Message>() {
@Override
public void onMessage(String channel, Message msg) {
Assert.assertEquals(new Message("123"), msg);
messageRecieved.countDown();
}
topic2.addListener((channel, msg) -> {
Assert.assertEquals(new Message("123"), msg);
messageRecieved.countDown();
});
for (int i = 0; i < 5000; i++) {
@ -269,15 +268,12 @@ public class RedissonTopicTest {
redisson1.shutdown();
redisson2.shutdown();
}
@Test
@Test(timeout = 300 * 1000)
public void testListenerRemove() throws InterruptedException {
RedissonClient redisson1 = BaseTest.createInstance();
RTopic<Message> topic1 = redisson1.getTopic("topic");
int id = topic1.addListener(new MessageListener<Message>() {
@Override
public void onMessage(String channel, Message msg) {
Assert.fail();
}
int id = topic1.addListener((channel, msg) -> {
Assert.fail();
});
RedissonClient redisson2 = BaseTest.createInstance();

@ -1,5 +1,6 @@
package org.redisson;
import java.io.IOException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -14,6 +15,9 @@ import org.redisson.core.RLock;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import static org.redisson.BaseTest.createInstance;
@RunWith(Parameterized.class)
public class RedissonTwoLockedThread {
@ -28,16 +32,37 @@ public class RedissonTwoLockedThread {
private RedissonClient redisson;
@BeforeClass
public static void beforeClass() throws IOException, InterruptedException {
if (!RedissonRuntimeEnvironment.isTravis) {
RedisRunner.startDefaultRedisServerInstance();
}
}
@AfterClass
public static void afterClass() throws IOException, InterruptedException {
if (!RedissonRuntimeEnvironment.isTravis) {
RedisRunner.shutDownDefaultRedisServerInstance();
}
}
@Before
public void before() {
public void before() throws IOException, InterruptedException {
if (RedissonRuntimeEnvironment.isTravis) {
RedisRunner.startDefaultRedisServerInstance();
redisson = createInstance();
}
Config config = BaseTest.createConfig();
config.setCodec(codec);
redisson = Redisson.create(config);
}
@After
public void after() {
public void after() throws InterruptedException {
redisson.shutdown();
if (RedissonRuntimeEnvironment.isTravis) {
RedisRunner.shutDownDefaultRedisServerInstance();
}
}
@Test(timeout = 3000)

@ -2,24 +2,19 @@ package org.redisson.spring.cache;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.redisson.RedisRunner;
import org.redisson.RedisRunner.RedisProcess;
import org.redisson.Redisson;
import org.redisson.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.codec.SerializationCodec;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.CachePut;
@ -62,26 +57,26 @@ public class RedissonSpringCacheTest {
@Service
public static class SampleBean {
@CachePut(cacheNames="testMap", key="#key")
@CachePut(cacheNames = "testMap", key = "#key")
public SampleObject store(String key, SampleObject object) {
return object;
}
@CachePut(cacheNames="testMap", key="#key")
@CachePut(cacheNames = "testMap", key = "#key")
public SampleObject storeNull(String key) {
return null;
}
@CacheEvict(cacheNames="testMap", key="#key")
@CacheEvict(cacheNames = "testMap", key = "#key")
public void remove(String key) {
}
@Cacheable(cacheNames="testMap", key="#key")
@Cacheable(cacheNames = "testMap", key = "#key")
public SampleObject read(String key) {
throw new IllegalStateException();
}
@Cacheable(cacheNames="testMap", key="#key")
@Cacheable(cacheNames = "testMap", key = "#key")
public SampleObject readNull(String key) {
return null;
}
@ -93,7 +88,7 @@ public class RedissonSpringCacheTest {
@EnableCaching
public static class Application {
@Bean(destroyMethod="shutdown")
@Bean(destroyMethod = "shutdown")
RedissonClient redisson() {
return Redisson.create();
}
@ -101,7 +96,7 @@ public class RedissonSpringCacheTest {
@Bean
CacheManager cacheManager(RedissonClient redissonClient) throws IOException {
Map<String, CacheConfig> config = new HashMap<String, CacheConfig>();
config.put("testMap", new CacheConfig(24*60*1000, 12*60*1000));
config.put("testMap", new CacheConfig(24 * 60 * 1000, 12 * 60 * 1000));
return new RedissonSpringCacheManager(redissonClient, config);
}
@ -112,7 +107,7 @@ public class RedissonSpringCacheTest {
@EnableCaching
public static class JsonConfigApplication {
@Bean(destroyMethod="shutdown")
@Bean(destroyMethod = "shutdown")
RedissonClient redisson() {
return Redisson.create();
}
@ -124,10 +119,14 @@ public class RedissonSpringCacheTest {
}
private static RedisProcess p;
@Parameterized.Parameters(name= "{index} - {0}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] {
@Parameterized.Parameters(name = "{index} - {0}")
public static Iterable<Object[]> data() throws IOException, InterruptedException {
if (p == null) {
p = RedisRunner.startDefaultRedisServerInstance();
}
return Arrays.asList(new Object[][]{
{new AnnotationConfigApplicationContext(Application.class)},
{new AnnotationConfigApplicationContext(JsonConfigApplication.class)}
});
@ -137,8 +136,9 @@ public class RedissonSpringCacheTest {
public AnnotationConfigApplicationContext context;
@AfterClass
public static void after() {
RedissonSpringCacheTest.data().forEach(e -> ((ConfigurableApplicationContext)e[0]).close());
public static void after() throws InterruptedException, IOException {
RedissonSpringCacheTest.data().forEach(e -> ((ConfigurableApplicationContext) e[0]).close());
RedisRunner.shutDownDefaultRedisServerInstance();
}
@Test

Loading…
Cancel
Save