Merge branch 'master' into 3.0.0

pull/1303/head
Nikita 7 years ago
commit cce0cf3809

@ -162,6 +162,12 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
@Override
public void start() throws LifecycleException {
redisson = buildClient();
lifecycle.fireLifecycleEvent(START_EVENT, null);
}
protected RedissonClient buildClient() throws LifecycleException {
Config config = null;
try {
config = Config.fromJSON(new File(configPath), getClass().getClassLoader());
@ -176,12 +182,10 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
}
try {
redisson = Redisson.create(config);
return Redisson.create(config);
} catch (Exception e) {
throw new LifecycleException(e);
}
lifecycle.fireLifecycleEvent(START_EVENT, null);
}
@Override

@ -143,6 +143,13 @@ public class RedissonSessionManager extends ManagerBase {
@Override
protected void startInternal() throws LifecycleException {
super.startInternal();
redisson = buildClient();
setState(LifecycleState.STARTING);
}
protected RedissonClient buildClient() throws LifecycleException {
Config config = null;
try {
config = Config.fromJSON(new File(configPath), getClass().getClassLoader());
@ -166,12 +173,10 @@ public class RedissonSessionManager extends ManagerBase {
throw new IllegalStateException("Unable to initialize codec with ClassLoader parameter", e);
}
redisson = Redisson.create(config);
return Redisson.create(config);
} catch (Exception e) {
throw new LifecycleException(e);
}
setState(LifecycleState.STARTING);
}
@Override

@ -144,6 +144,12 @@ public class RedissonSessionManager extends ManagerBase {
@Override
protected void startInternal() throws LifecycleException {
super.startInternal();
redisson = buildClient();
setState(LifecycleState.STARTING);
}
protected RedissonClient buildClient() throws LifecycleException {
Config config = null;
try {
config = Config.fromJSON(new File(configPath), getClass().getClassLoader());
@ -163,12 +169,10 @@ public class RedissonSessionManager extends ManagerBase {
.newInstance(Thread.currentThread().getContextClassLoader());
config.setCodec(codec);
redisson = Redisson.create(config);
return Redisson.create(config);
} catch (Exception e) {
throw new LifecycleException(e);
}
setState(LifecycleState.STARTING);
}
@Override

@ -81,9 +81,9 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.jayway.awaitility</groupId>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>1.7.0</version>
<version>3.0.0</version>
<scope>test</scope>
</dependency>
<dependency>

@ -126,8 +126,7 @@ public class RedissonBlockingDeque<V> extends RedissonDeque<V> implements RBlock
@Override
public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException {
RFuture<V> res = takeLastAndOfferFirstToAsync(queueName);
return res.await().getNow();
return get(takeLastAndOfferFirstToAsync(queueName));
}
@Override
@ -194,8 +193,7 @@ public class RedissonBlockingDeque<V> extends RedissonDeque<V> implements RBlock
@Override
public V takeFirst() throws InterruptedException {
RFuture<V> res = takeFirstAsync();
return res.await().getNow();
return get(takeFirstAsync());
}
@Override
@ -210,8 +208,7 @@ public class RedissonBlockingDeque<V> extends RedissonDeque<V> implements RBlock
@Override
public V takeLast() throws InterruptedException {
RFuture<V> res = takeLastAsync();
return res.await().getNow();
return get(takeLastAsync());
}
@Override
@ -221,8 +218,7 @@ public class RedissonBlockingDeque<V> extends RedissonDeque<V> implements RBlock
@Override
public V pollFirstFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException {
RFuture<V> res = pollFirstFromAnyAsync(timeout, unit, queueNames);
return res.await().getNow();
return get(pollFirstFromAnyAsync(timeout, unit, queueNames));
}
@Override
@ -232,8 +228,7 @@ public class RedissonBlockingDeque<V> extends RedissonDeque<V> implements RBlock
@Override
public V pollLastFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException {
RFuture<V> res = pollLastFromAnyAsync(timeout, unit, queueNames);
return res.await().getNow();
return get(pollLastFromAnyAsync(timeout, unit, queueNames));
}
@Override
@ -249,8 +244,7 @@ public class RedissonBlockingDeque<V> extends RedissonDeque<V> implements RBlock
@Override
public V pollFirst(long timeout, TimeUnit unit) throws InterruptedException {
RFuture<V> res = pollFirstAsync(timeout, unit);
return res.await().getNow();
return get(pollFirstAsync(timeout, unit));
}
@Override
@ -260,8 +254,7 @@ public class RedissonBlockingDeque<V> extends RedissonDeque<V> implements RBlock
@Override
public V pollLast(long timeout, TimeUnit unit) throws InterruptedException {
RFuture<V> res = pollLastAsync(timeout, unit);
return res.await().getNow();
return get(pollLastAsync(timeout, unit));
}
}

@ -79,8 +79,7 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
*/
@Override
public V take() throws InterruptedException {
RFuture<V> res = takeAsync();
return res.await().getNow();
return get(takeAsync());
}
@Override
@ -94,8 +93,7 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
*/
@Override
public V poll(long timeout, TimeUnit unit) throws InterruptedException {
RFuture<V> res = pollAsync(timeout, unit);
return res.await().getNow();
return get(pollAsync(timeout, unit));
}
/*
@ -104,8 +102,7 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
*/
@Override
public V pollFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException {
RFuture<V> res = pollFromAnyAsync(timeout, unit, queueNames);
return res.await().getNow();
return get(pollFromAnyAsync(timeout, unit, queueNames));
}
/*
@ -130,14 +127,12 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
@Override
public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException {
RFuture<V> res = pollLastAndOfferFirstToAsync(queueName, timeout, unit);
return res.await().getNow();
return get(pollLastAndOfferFirstToAsync(queueName, timeout, unit));
}
@Override
public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException {
RFuture<V> res = takeLastAndOfferFirstToAsync(queueName);
return res.await().getNow();
return get(takeLastAndOfferFirstToAsync(queueName));
}
@Override

@ -31,7 +31,6 @@ import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.connection.decoder.ListDrainToDecoder;
import org.redisson.misc.PromiseDelegator;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.pubsub.SemaphorePubSub;
@ -256,8 +255,7 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
@Override
public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException {
RFuture<V> res = takeLastAndOfferFirstToAsync(queueName);
return res.await().getNow();
return get(takeLastAndOfferFirstToAsync(queueName));
}
@Override

@ -53,6 +53,10 @@ public class RedissonReadLock extends RedissonLock implements RLock {
return super.getLockName(threadId) + ":write";
}
String getReadWriteTimeoutNamePrefix(long threadId) {
return suffixName(getName(), getLockName(threadId)) + ":rwlock_timeout";
}
@Override
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
@ -62,25 +66,27 @@ public class RedissonReadLock extends RedissonLock implements RLock {
"if (mode == false) then " +
"redis.call('hset', KEYS[1], 'mode', 'read'); " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('set', KEYS[1] .. ':' .. ARGV[2] .. ':rwlock_timeout:1', 1); " +
"redis.call('pexpire', KEYS[1] .. ':' .. ARGV[2] .. ':rwlock_timeout:1', ARGV[1]); " +
"redis.call('set', KEYS[2] .. ':1', 1); " +
"redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
"local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"local key = KEYS[1] .. ':' .. ARGV[2] .. ':rwlock_timeout:' .. ind;" +
"local key = KEYS[2] .. ':' .. ind;" +
"redis.call('set', key, 1); " +
"redis.call('pexpire', key, ARGV[1]); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end;" +
"return redis.call('pttl', KEYS[1]);",
Arrays.<Object>asList(getName()), internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId));
Arrays.<Object>asList(getName(), getReadWriteTimeoutNamePrefix(threadId)),
internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId));
}
@Override
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
@ -96,7 +102,7 @@ public class RedissonReadLock extends RedissonLock implements RLock {
"if (counter == 0) then " +
"redis.call('hdel', KEYS[1], ARGV[2]); " +
"end;" +
"redis.call('del', KEYS[1] .. ':' .. ARGV[2] .. ':rwlock_timeout:' .. (counter+1)); " +
"redis.call('del', KEYS[3] .. ':' .. (counter+1)); " +
"if (redis.call('hlen', KEYS[1]) > 1) then " +
"local maxRemainTime = -3; " +
"local keys = redis.call('hkeys', KEYS[1]); " +
@ -104,7 +110,7 @@ public class RedissonReadLock extends RedissonLock implements RLock {
"counter = tonumber(redis.call('hget', KEYS[1], key)); " +
"if type(counter) == 'number' then " +
"for i=counter, 1, -1 do " +
"local remainTime = redis.call('pttl', KEYS[1] .. ':' .. key .. ':rwlock_timeout:' .. i); " +
"local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " +
"maxRemainTime = math.max(remainTime, maxRemainTime);" +
"end; " +
"end; " +
@ -123,7 +129,8 @@ public class RedissonReadLock extends RedissonLock implements RLock {
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; ",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, getLockName(threadId));
Arrays.<Object>asList(getName(), getChannelName(), timeoutPrefix, timeoutPrefix.split(":")[0]),
LockPubSub.unlockMessage, getLockName(threadId));
}
@Override

@ -68,13 +68,15 @@ public class RedissonWriteLock extends RedissonLock implements RLock {
"end; " +
"if (mode == 'write') then " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"local currentExpire = redis.call('pttl', KEYS[1]); " +
"redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " +
"return nil; " +
"end; " +
"end;" +
"return redis.call('pttl', KEYS[1]);",
Arrays.<Object>asList(getName()), internalLockLeaseTime, getLockName(threadId));
Arrays.<Object>asList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
@Override
@ -108,7 +110,8 @@ public class RedissonWriteLock extends RedissonLock implements RLock {
"end; " +
"end; "
+ "return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
Arrays.<Object>asList(getName(), getChannelName()),
LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
@Override

@ -145,6 +145,17 @@ public class MasterSlaveEntry {
entry.reset();
closeConnections(entry);
for (RedisPubSubConnection connection : entry.getAllSubscribeConnections()) {
reattachPubSub(connection, temporaryDown);
}
entry.getAllSubscribeConnections().clear();
return true;
}
private void closeConnections(ClientConnectionsEntry entry) {
// close all connections
while (true) {
final RedisConnection connection = entry.pollConnection();
@ -168,13 +179,6 @@ public class MasterSlaveEntry {
}
connection.closeAsync();
}
for (RedisPubSubConnection connection : entry.getAllSubscribeConnections()) {
reattachPubSub(connection, temporaryDown);
}
entry.getAllSubscribeConnections().clear();
return true;
}
private void reattachPubSub(RedisPubSubConnection redisPubSubConnection, boolean temporaryDown) {

@ -1,6 +1,6 @@
package org.redisson;
import static com.jayway.awaitility.Awaitility.await;
import static org.awaitility.Awaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;

@ -1,6 +1,6 @@
package org.redisson;
import static com.jayway.awaitility.Awaitility.await;
import static org.awaitility.Awaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
@ -63,7 +63,7 @@ public class RedissonBoundedBlockingQueueTest extends BaseTest {
assertThat(queue.offer(6, 3, TimeUnit.SECONDS)).isTrue();
assertThat(System.currentTimeMillis() - start).isBetween(1000L, 2000L);
await().atMost(2, TimeUnit.SECONDS).until(() -> assertThat(executed.get()).isTrue());
await().atMost(2, TimeUnit.SECONDS).until(() -> executed.get());
assertThat(queue).containsExactly(2, 3, 4, 5, 6);
@ -142,7 +142,7 @@ public class RedissonBoundedBlockingQueueTest extends BaseTest {
queue1.put(4);
await().atMost(5, TimeUnit.SECONDS).until(() -> assertThat(executed.get()).isTrue());
await().atMost(5, TimeUnit.SECONDS).until(() -> executed.get());
assertThat(queue1).containsExactly(2, 3, 4);
@ -292,7 +292,7 @@ public class RedissonBoundedBlockingQueueTest extends BaseTest {
t.join();
await().atMost(5, TimeUnit.SECONDS).until(() -> assertThat(executed.get()).isTrue());
await().atMost(5, TimeUnit.SECONDS).until(() -> executed.get());
redisson.shutdown();
runner.stop();

@ -1,5 +1,6 @@
package org.redisson;
import static org.awaitility.Awaitility.*;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -7,14 +8,11 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import static com.jayway.awaitility.Awaitility.*;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.api.RLock;
import com.jayway.awaitility.Awaitility;
public class RedissonFairLockTest extends BaseConcurrentTest {
@Test
@ -144,7 +142,7 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
Assert.assertTrue(latch.await(1, TimeUnit.SECONDS));
RLock lock = redisson.getFairLock("lock");
Awaitility.await().atMost(redisson.getConfig().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS).until(() -> !lock.isLocked());
await().atMost(redisson.getConfig().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS).until(() -> !lock.isLocked());
}
@Test
@ -377,7 +375,7 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
t1.start();
}
await().atMost(30, TimeUnit.SECONDS).until(() -> assertThat(lockedCounter.get()).isEqualTo(totalThreads));
await().atMost(30, TimeUnit.SECONDS).until(() -> lockedCounter.get() == totalThreads);
}

@ -12,7 +12,7 @@ import org.junit.Test;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import com.jayway.awaitility.Awaitility;
import static org.awaitility.Awaitility.*;
public class RedissonLockTest extends BaseConcurrentTest {
@ -100,7 +100,7 @@ public class RedissonLockTest extends BaseConcurrentTest {
t.join();
r.shutdown();
Awaitility.await().atMost(redisson.getConfig().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS).until(() -> !lock.isLocked());
await().atMost(redisson.getConfig().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS).until(() -> !lock.isLocked());
}
@Test

@ -19,6 +19,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.awaitility.Duration;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.api.MapOptions;
@ -36,8 +37,7 @@ import org.redisson.client.codec.StringCodec;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.codec.MsgPackJacksonCodec;
import com.jayway.awaitility.Awaitility;
import com.jayway.awaitility.Duration;
import static org.awaitility.Awaitility.*;
public class RedissonMapCacheTest extends BaseMapTest {
@ -1034,7 +1034,7 @@ public class RedissonMapCacheTest extends BaseMapTest {
});
runnable.run();
Awaitility.await().atMost(Duration.ONE_SECOND).untilTrue(ref);
await().atMost(Duration.ONE_SECOND).untilTrue(ref);
map.removeListener(createListener1);
}
@ -1090,7 +1090,7 @@ public class RedissonMapCacheTest extends BaseMapTest {
});
runnable.run();
Awaitility.await().atMost(Duration.ONE_MINUTE).untilTrue(ref);
await().atMost(Duration.ONE_MINUTE).untilTrue(ref);
map.removeListener(createListener1);
}
@ -1113,7 +1113,7 @@ public class RedissonMapCacheTest extends BaseMapTest {
});
runnable.run();
Awaitility.await().atMost(Duration.ONE_SECOND).untilTrue(ref);
await().atMost(Duration.ONE_SECOND).untilTrue(ref);
map.removeListener(createListener1);
}
@ -1148,7 +1148,7 @@ public class RedissonMapCacheTest extends BaseMapTest {
});
runnable.run();
Awaitility.await().atMost(Duration.ONE_SECOND).untilTrue(ref);
await().atMost(Duration.ONE_SECOND).untilTrue(ref);
map.removeListener(createListener1);
}

@ -12,7 +12,7 @@ import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import static com.jayway.awaitility.Awaitility.await;
import static org.awaitility.Awaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonMultiLockTest {
@ -87,7 +87,7 @@ public class RedissonMultiLockTest {
t.start();
t.join();
await().atMost(5, TimeUnit.SECONDS).until(() -> assertThat(executed.get()).isTrue());
await().atMost(5, TimeUnit.SECONDS).until(() -> executed.get());
lock.unlock();

@ -1,6 +1,5 @@
package org.redisson;
import static com.jayway.awaitility.Awaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import java.security.SecureRandom;
@ -10,15 +9,66 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.awaitility.Duration;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.api.RLock;
import org.redisson.api.RReadWriteLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import com.jayway.awaitility.Awaitility;
import static org.awaitility.Awaitility.*;
public class RedissonReadWriteLockTest extends BaseConcurrentTest {
@Test
public void testWriteLockExpiration() throws InterruptedException {
RReadWriteLock rw1 = redisson.getReadWriteLock("test2s3");
RLock l1 = rw1.writeLock();
assertThat(l1.tryLock(10000, 10000, TimeUnit.MILLISECONDS)).isTrue();
RLock l2 = rw1.writeLock();
assertThat(l2.tryLock(1000, 1000, TimeUnit.MILLISECONDS)).isTrue();
await().atMost(Duration.TEN_SECONDS).until(() -> {
RReadWriteLock rw2 = redisson.getReadWriteLock("test2s3");
try {
return !rw2.writeLock().tryLock(3000, 1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
return false;
}
});
}
@Test
public void testInCluster() throws Exception {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1)
.addNode(master2)
.addNode(master3);
ClusterProcesses process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RReadWriteLock s = redisson.getReadWriteLock("1234");
s.writeLock().lock();
s.readLock().lock();
s.readLock().unlock();
s.writeLock().unlock();
redisson.shutdown();
process.shutdown();
}
@Test
public void testReadLockLeaseTimeoutDiffThreadsWRR() throws InterruptedException {
RLock writeLock = redisson.getReadWriteLock("my_read_write_lock").writeLock();
@ -319,7 +369,7 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest {
});
RReadWriteLock lock1 = redisson.getReadWriteLock("lock");
Awaitility.await().atMost(redisson.getConfig().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS).until(() -> !lock1.writeLock().isLocked());
await().atMost(redisson.getConfig().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS).until(() -> !lock1.writeLock().isLocked());
}
@Test

@ -17,7 +17,7 @@ import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import static com.jayway.awaitility.Awaitility.await;
import static org.awaitility.Awaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonRedLockTest {
@ -316,7 +316,7 @@ public class RedissonRedLockTest {
t.start();
t.join();
await().atMost(5, TimeUnit.SECONDS).until(() -> assertThat(executed.get()).isTrue());
await().atMost(5, TimeUnit.SECONDS).until(() -> executed.get());
lock.unlock();

@ -1,6 +1,6 @@
package org.redisson;
import static com.jayway.awaitility.Awaitility.await;
import static org.awaitility.Awaitility.*;
import static org.assertj.core.api.Assertions.assertThat;
import static org.redisson.BaseTest.createInstance;
@ -289,8 +289,8 @@ public class RedissonTest {
Assert.assertEquals(0, pp.stop());
await().atMost(2, TimeUnit.SECONDS).until(() -> assertThat(connectCounter.get()).isEqualTo(1));
await().atMost(2, TimeUnit.SECONDS).until(() -> assertThat(disconnectCounter.get()).isEqualTo(1));
await().atMost(2, TimeUnit.SECONDS).until(() -> connectCounter.get() == 1);
await().atMost(2, TimeUnit.SECONDS).until(() -> disconnectCounter.get() == 1);
}
@Test

@ -1,6 +1,6 @@
package org.redisson;
import static com.jayway.awaitility.Awaitility.await;
import static org.awaitility.Awaitility.await;
import java.io.IOException;
import java.util.ArrayList;

@ -1,7 +1,7 @@
package org.redisson;
import static com.jayway.awaitility.Awaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import java.io.IOException;
import java.io.Serializable;
@ -16,6 +16,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.awaitility.Duration;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@ -34,9 +35,6 @@ import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.config.Config;
import com.jayway.awaitility.Awaitility;
import com.jayway.awaitility.Duration;
public class RedissonTopicTest {
@BeforeClass
@ -157,7 +155,7 @@ public class RedissonTopicTest {
});
topic1.publish(123L);
Awaitility.await().atMost(Duration.ONE_SECOND).untilTrue(stringMessageReceived);
await().atMost(Duration.ONE_SECOND).untilTrue(stringMessageReceived);
redisson1.shutdown();
}
@ -178,7 +176,7 @@ public class RedissonTopicTest {
});
stringTopic.publish("testmsg");
Awaitility.await().atMost(Duration.ONE_SECOND).untilTrue(stringMessageReceived);
await().atMost(Duration.ONE_SECOND).untilTrue(stringMessageReceived);
stringTopic.removeListener(listenerId);
}
@ -213,8 +211,8 @@ public class RedissonTopicTest {
});
longTopic.publish(1L);
Awaitility.await().atMost(Duration.ONE_SECOND).untilTrue(stringMessageReceived);
Awaitility.await().atMost(Duration.ONE_SECOND).untilTrue(longMessageReceived);
await().atMost(Duration.ONE_SECOND).untilTrue(stringMessageReceived);
await().atMost(Duration.ONE_SECOND).untilTrue(longMessageReceived);
}
@Test

@ -14,6 +14,7 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.awaitility.Duration;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@ -28,8 +29,7 @@ import org.redisson.api.RExecutorService;
import org.redisson.config.Config;
import org.redisson.config.RedissonNodeConfig;
import com.jayway.awaitility.Awaitility;
import com.jayway.awaitility.Duration;
import static org.awaitility.Awaitility.*;
public class RedissonExecutorServiceTest extends BaseTest {
@ -121,7 +121,7 @@ public class RedissonExecutorServiceTest extends BaseTest {
e.execute(new IncrementRunnableTask("myCounter"), new IncrementRunnableTask("myCounter"),
new IncrementRunnableTask("myCounter"), new IncrementRunnableTask("myCounter"));
Awaitility.await().atMost(Duration.FIVE_SECONDS).until(() -> redisson.getAtomicLong("myCounter").get() == 4);
await().atMost(Duration.FIVE_SECONDS).until(() -> redisson.getAtomicLong("myCounter").get() == 4);
}
@Test

Loading…
Cancel
Save