diff --git a/redisson-tomcat/redisson-tomcat-6/src/main/java/org/redisson/tomcat/RedissonSessionManager.java b/redisson-tomcat/redisson-tomcat-6/src/main/java/org/redisson/tomcat/RedissonSessionManager.java index 65dd81ac2..ccf7b9411 100644 --- a/redisson-tomcat/redisson-tomcat-6/src/main/java/org/redisson/tomcat/RedissonSessionManager.java +++ b/redisson-tomcat/redisson-tomcat-6/src/main/java/org/redisson/tomcat/RedissonSessionManager.java @@ -162,6 +162,12 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle { @Override public void start() throws LifecycleException { + redisson = buildClient(); + + lifecycle.fireLifecycleEvent(START_EVENT, null); + } + + protected RedissonClient buildClient() throws LifecycleException { Config config = null; try { config = Config.fromJSON(new File(configPath), getClass().getClassLoader()); @@ -176,12 +182,10 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle { } try { - redisson = Redisson.create(config); + return Redisson.create(config); } catch (Exception e) { throw new LifecycleException(e); } - - lifecycle.fireLifecycleEvent(START_EVENT, null); } @Override diff --git a/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/RedissonSessionManager.java b/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/RedissonSessionManager.java index 45f87b10d..1af366e72 100644 --- a/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/RedissonSessionManager.java +++ b/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/RedissonSessionManager.java @@ -143,6 +143,13 @@ public class RedissonSessionManager extends ManagerBase { @Override protected void startInternal() throws LifecycleException { super.startInternal(); + + redisson = buildClient(); + + setState(LifecycleState.STARTING); + } + + protected RedissonClient buildClient() throws LifecycleException { Config config = null; try { config = Config.fromJSON(new File(configPath), getClass().getClassLoader()); @@ -166,12 +173,10 @@ public class RedissonSessionManager extends ManagerBase { throw new IllegalStateException("Unable to initialize codec with ClassLoader parameter", e); } - redisson = Redisson.create(config); + return Redisson.create(config); } catch (Exception e) { throw new LifecycleException(e); } - - setState(LifecycleState.STARTING); } @Override diff --git a/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/RedissonSessionManager.java b/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/RedissonSessionManager.java index 21f71bfea..c170335f7 100644 --- a/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/RedissonSessionManager.java +++ b/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/RedissonSessionManager.java @@ -144,6 +144,12 @@ public class RedissonSessionManager extends ManagerBase { @Override protected void startInternal() throws LifecycleException { super.startInternal(); + redisson = buildClient(); + + setState(LifecycleState.STARTING); + } + + protected RedissonClient buildClient() throws LifecycleException { Config config = null; try { config = Config.fromJSON(new File(configPath), getClass().getClassLoader()); @@ -163,12 +169,10 @@ public class RedissonSessionManager extends ManagerBase { .newInstance(Thread.currentThread().getContextClassLoader()); config.setCodec(codec); - redisson = Redisson.create(config); + return Redisson.create(config); } catch (Exception e) { throw new LifecycleException(e); } - - setState(LifecycleState.STARTING); } @Override diff --git a/redisson/pom.xml b/redisson/pom.xml index d345336a3..bc8c9ddcf 100644 --- a/redisson/pom.xml +++ b/redisson/pom.xml @@ -81,9 +81,9 @@ test - com.jayway.awaitility + org.awaitility awaitility - 1.7.0 + 3.0.0 test diff --git a/redisson/src/main/java/org/redisson/RedissonBlockingDeque.java b/redisson/src/main/java/org/redisson/RedissonBlockingDeque.java index b82bb0761..b24d83856 100644 --- a/redisson/src/main/java/org/redisson/RedissonBlockingDeque.java +++ b/redisson/src/main/java/org/redisson/RedissonBlockingDeque.java @@ -126,8 +126,7 @@ public class RedissonBlockingDeque extends RedissonDeque implements RBlock @Override public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException { - RFuture res = takeLastAndOfferFirstToAsync(queueName); - return res.await().getNow(); + return get(takeLastAndOfferFirstToAsync(queueName)); } @Override @@ -194,8 +193,7 @@ public class RedissonBlockingDeque extends RedissonDeque implements RBlock @Override public V takeFirst() throws InterruptedException { - RFuture res = takeFirstAsync(); - return res.await().getNow(); + return get(takeFirstAsync()); } @Override @@ -210,8 +208,7 @@ public class RedissonBlockingDeque extends RedissonDeque implements RBlock @Override public V takeLast() throws InterruptedException { - RFuture res = takeLastAsync(); - return res.await().getNow(); + return get(takeLastAsync()); } @Override @@ -221,8 +218,7 @@ public class RedissonBlockingDeque extends RedissonDeque implements RBlock @Override public V pollFirstFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException { - RFuture res = pollFirstFromAnyAsync(timeout, unit, queueNames); - return res.await().getNow(); + return get(pollFirstFromAnyAsync(timeout, unit, queueNames)); } @Override @@ -232,8 +228,7 @@ public class RedissonBlockingDeque extends RedissonDeque implements RBlock @Override public V pollLastFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException { - RFuture res = pollLastFromAnyAsync(timeout, unit, queueNames); - return res.await().getNow(); + return get(pollLastFromAnyAsync(timeout, unit, queueNames)); } @Override @@ -249,8 +244,7 @@ public class RedissonBlockingDeque extends RedissonDeque implements RBlock @Override public V pollFirst(long timeout, TimeUnit unit) throws InterruptedException { - RFuture res = pollFirstAsync(timeout, unit); - return res.await().getNow(); + return get(pollFirstAsync(timeout, unit)); } @Override @@ -260,8 +254,7 @@ public class RedissonBlockingDeque extends RedissonDeque implements RBlock @Override public V pollLast(long timeout, TimeUnit unit) throws InterruptedException { - RFuture res = pollLastAsync(timeout, unit); - return res.await().getNow(); + return get(pollLastAsync(timeout, unit)); } } \ No newline at end of file diff --git a/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java b/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java index 4f518e681..3809ade4d 100644 --- a/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java @@ -79,8 +79,7 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock */ @Override public V take() throws InterruptedException { - RFuture res = takeAsync(); - return res.await().getNow(); + return get(takeAsync()); } @Override @@ -94,8 +93,7 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock */ @Override public V poll(long timeout, TimeUnit unit) throws InterruptedException { - RFuture res = pollAsync(timeout, unit); - return res.await().getNow(); + return get(pollAsync(timeout, unit)); } /* @@ -104,8 +102,7 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock */ @Override public V pollFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException { - RFuture res = pollFromAnyAsync(timeout, unit, queueNames); - return res.await().getNow(); + return get(pollFromAnyAsync(timeout, unit, queueNames)); } /* @@ -130,14 +127,12 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock @Override public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException { - RFuture res = pollLastAndOfferFirstToAsync(queueName, timeout, unit); - return res.await().getNow(); + return get(pollLastAndOfferFirstToAsync(queueName, timeout, unit)); } @Override public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException { - RFuture res = takeLastAndOfferFirstToAsync(queueName); - return res.await().getNow(); + return get(takeLastAndOfferFirstToAsync(queueName)); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java b/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java index f754571a3..b4231f0d6 100644 --- a/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java @@ -31,7 +31,6 @@ import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandExecutor; import org.redisson.connection.decoder.ListDrainToDecoder; -import org.redisson.misc.PromiseDelegator; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; import org.redisson.pubsub.SemaphorePubSub; @@ -256,8 +255,7 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements @Override public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException { - RFuture res = takeLastAndOfferFirstToAsync(queueName); - return res.await().getNow(); + return get(takeLastAndOfferFirstToAsync(queueName)); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonReadLock.java b/redisson/src/main/java/org/redisson/RedissonReadLock.java index 4bf5f770b..096a335ac 100644 --- a/redisson/src/main/java/org/redisson/RedissonReadLock.java +++ b/redisson/src/main/java/org/redisson/RedissonReadLock.java @@ -53,6 +53,10 @@ public class RedissonReadLock extends RedissonLock implements RLock { return super.getLockName(threadId) + ":write"; } + String getReadWriteTimeoutNamePrefix(long threadId) { + return suffixName(getName(), getLockName(threadId)) + ":rwlock_timeout"; + } + @Override RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) { internalLockLeaseTime = unit.toMillis(leaseTime); @@ -62,25 +66,27 @@ public class RedissonReadLock extends RedissonLock implements RLock { "if (mode == false) then " + "redis.call('hset', KEYS[1], 'mode', 'read'); " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + - "redis.call('set', KEYS[1] .. ':' .. ARGV[2] .. ':rwlock_timeout:1', 1); " + - "redis.call('pexpire', KEYS[1] .. ':' .. ARGV[2] .. ':rwlock_timeout:1', ARGV[1]); " + + "redis.call('set', KEYS[2] .. ':1', 1); " + + "redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " + "local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " + - "local key = KEYS[1] .. ':' .. ARGV[2] .. ':rwlock_timeout:' .. ind;" + + "local key = KEYS[2] .. ':' .. ind;" + "redis.call('set', key, 1); " + "redis.call('pexpire', key, ARGV[1]); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end;" + "return redis.call('pttl', KEYS[1]);", - Arrays.asList(getName()), internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId)); + Arrays.asList(getName(), getReadWriteTimeoutNamePrefix(threadId)), + internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId)); } @Override protected RFuture unlockInnerAsync(long threadId) { + String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local mode = redis.call('hget', KEYS[1], 'mode'); " + "if (mode == false) then " + @@ -96,7 +102,7 @@ public class RedissonReadLock extends RedissonLock implements RLock { "if (counter == 0) then " + "redis.call('hdel', KEYS[1], ARGV[2]); " + "end;" + - "redis.call('del', KEYS[1] .. ':' .. ARGV[2] .. ':rwlock_timeout:' .. (counter+1)); " + + "redis.call('del', KEYS[3] .. ':' .. (counter+1)); " + "if (redis.call('hlen', KEYS[1]) > 1) then " + "local maxRemainTime = -3; " + "local keys = redis.call('hkeys', KEYS[1]); " + @@ -104,7 +110,7 @@ public class RedissonReadLock extends RedissonLock implements RLock { "counter = tonumber(redis.call('hget', KEYS[1], key)); " + "if type(counter) == 'number' then " + "for i=counter, 1, -1 do " + - "local remainTime = redis.call('pttl', KEYS[1] .. ':' .. key .. ':rwlock_timeout:' .. i); " + + "local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " + "maxRemainTime = math.max(remainTime, maxRemainTime);" + "end; " + "end; " + @@ -123,7 +129,8 @@ public class RedissonReadLock extends RedissonLock implements RLock { "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; ", - Arrays.asList(getName(), getChannelName()), LockPubSub.unlockMessage, getLockName(threadId)); + Arrays.asList(getName(), getChannelName(), timeoutPrefix, timeoutPrefix.split(":")[0]), + LockPubSub.unlockMessage, getLockName(threadId)); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonWriteLock.java b/redisson/src/main/java/org/redisson/RedissonWriteLock.java index 865189236..c5e9b709f 100644 --- a/redisson/src/main/java/org/redisson/RedissonWriteLock.java +++ b/redisson/src/main/java/org/redisson/RedissonWriteLock.java @@ -68,13 +68,15 @@ public class RedissonWriteLock extends RedissonLock implements RLock { "end; " + "if (mode == 'write') then " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + - "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + - "redis.call('pexpire', KEYS[1], ARGV[1]); " + + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + + "local currentExpire = redis.call('pttl', KEYS[1]); " + + "redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " + "return nil; " + "end; " + "end;" + "return redis.call('pttl', KEYS[1]);", - Arrays.asList(getName()), internalLockLeaseTime, getLockName(threadId)); + Arrays.asList(getName()), + internalLockLeaseTime, getLockName(threadId)); } @Override @@ -108,7 +110,8 @@ public class RedissonWriteLock extends RedissonLock implements RLock { "end; " + "end; " + "return nil;", - Arrays.asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId)); + Arrays.asList(getName(), getChannelName()), + LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId)); } @Override diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 58ddeee3d..a461433c0 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -145,6 +145,17 @@ public class MasterSlaveEntry { entry.reset(); + closeConnections(entry); + + for (RedisPubSubConnection connection : entry.getAllSubscribeConnections()) { + reattachPubSub(connection, temporaryDown); + } + entry.getAllSubscribeConnections().clear(); + + return true; + } + + private void closeConnections(ClientConnectionsEntry entry) { // close all connections while (true) { final RedisConnection connection = entry.pollConnection(); @@ -168,13 +179,6 @@ public class MasterSlaveEntry { } connection.closeAsync(); } - - for (RedisPubSubConnection connection : entry.getAllSubscribeConnections()) { - reattachPubSub(connection, temporaryDown); - } - entry.getAllSubscribeConnections().clear(); - - return true; } private void reattachPubSub(RedisPubSubConnection redisPubSubConnection, boolean temporaryDown) { diff --git a/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java b/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java index 4dab4f004..11fbd9ec2 100644 --- a/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java @@ -1,6 +1,6 @@ package org.redisson; -import static com.jayway.awaitility.Awaitility.await; +import static org.awaitility.Awaitility.await; import static org.assertj.core.api.Assertions.assertThat; import java.io.IOException; diff --git a/redisson/src/test/java/org/redisson/RedissonBoundedBlockingQueueTest.java b/redisson/src/test/java/org/redisson/RedissonBoundedBlockingQueueTest.java index 3d533f5d4..75fbac8b1 100644 --- a/redisson/src/test/java/org/redisson/RedissonBoundedBlockingQueueTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBoundedBlockingQueueTest.java @@ -1,6 +1,6 @@ package org.redisson; -import static com.jayway.awaitility.Awaitility.await; +import static org.awaitility.Awaitility.await; import static org.assertj.core.api.Assertions.assertThat; import java.io.IOException; @@ -63,7 +63,7 @@ public class RedissonBoundedBlockingQueueTest extends BaseTest { assertThat(queue.offer(6, 3, TimeUnit.SECONDS)).isTrue(); assertThat(System.currentTimeMillis() - start).isBetween(1000L, 2000L); - await().atMost(2, TimeUnit.SECONDS).until(() -> assertThat(executed.get()).isTrue()); + await().atMost(2, TimeUnit.SECONDS).until(() -> executed.get()); assertThat(queue).containsExactly(2, 3, 4, 5, 6); @@ -142,7 +142,7 @@ public class RedissonBoundedBlockingQueueTest extends BaseTest { queue1.put(4); - await().atMost(5, TimeUnit.SECONDS).until(() -> assertThat(executed.get()).isTrue()); + await().atMost(5, TimeUnit.SECONDS).until(() -> executed.get()); assertThat(queue1).containsExactly(2, 3, 4); @@ -292,7 +292,7 @@ public class RedissonBoundedBlockingQueueTest extends BaseTest { t.join(); - await().atMost(5, TimeUnit.SECONDS).until(() -> assertThat(executed.get()).isTrue()); + await().atMost(5, TimeUnit.SECONDS).until(() -> executed.get()); redisson.shutdown(); runner.stop(); diff --git a/redisson/src/test/java/org/redisson/RedissonFairLockTest.java b/redisson/src/test/java/org/redisson/RedissonFairLockTest.java index e935e8872..0f156b3c6 100644 --- a/redisson/src/test/java/org/redisson/RedissonFairLockTest.java +++ b/redisson/src/test/java/org/redisson/RedissonFairLockTest.java @@ -1,5 +1,6 @@ package org.redisson; +import static org.awaitility.Awaitility.*; import static org.assertj.core.api.Assertions.assertThat; import java.util.concurrent.ConcurrentLinkedQueue; @@ -7,14 +8,11 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; -import static com.jayway.awaitility.Awaitility.*; import org.junit.Assert; import org.junit.Test; import org.redisson.api.RLock; -import com.jayway.awaitility.Awaitility; - public class RedissonFairLockTest extends BaseConcurrentTest { @Test @@ -144,7 +142,7 @@ public class RedissonFairLockTest extends BaseConcurrentTest { Assert.assertTrue(latch.await(1, TimeUnit.SECONDS)); RLock lock = redisson.getFairLock("lock"); - Awaitility.await().atMost(redisson.getConfig().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS).until(() -> !lock.isLocked()); + await().atMost(redisson.getConfig().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS).until(() -> !lock.isLocked()); } @Test @@ -377,7 +375,7 @@ public class RedissonFairLockTest extends BaseConcurrentTest { t1.start(); } - await().atMost(30, TimeUnit.SECONDS).until(() -> assertThat(lockedCounter.get()).isEqualTo(totalThreads)); + await().atMost(30, TimeUnit.SECONDS).until(() -> lockedCounter.get() == totalThreads); } diff --git a/redisson/src/test/java/org/redisson/RedissonLockTest.java b/redisson/src/test/java/org/redisson/RedissonLockTest.java index 994b4c4af..78d5f8e7d 100644 --- a/redisson/src/test/java/org/redisson/RedissonLockTest.java +++ b/redisson/src/test/java/org/redisson/RedissonLockTest.java @@ -12,7 +12,7 @@ import org.junit.Test; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; -import com.jayway.awaitility.Awaitility; +import static org.awaitility.Awaitility.*; public class RedissonLockTest extends BaseConcurrentTest { @@ -100,7 +100,7 @@ public class RedissonLockTest extends BaseConcurrentTest { t.join(); r.shutdown(); - Awaitility.await().atMost(redisson.getConfig().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS).until(() -> !lock.isLocked()); + await().atMost(redisson.getConfig().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS).until(() -> !lock.isLocked()); } @Test diff --git a/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java b/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java index 36ab07e08..14aa94e31 100644 --- a/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java +++ b/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java @@ -19,6 +19,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; +import org.awaitility.Duration; import org.junit.Assert; import org.junit.Test; import org.redisson.api.MapOptions; @@ -36,8 +37,7 @@ import org.redisson.client.codec.StringCodec; import org.redisson.codec.JsonJacksonCodec; import org.redisson.codec.MsgPackJacksonCodec; -import com.jayway.awaitility.Awaitility; -import com.jayway.awaitility.Duration; +import static org.awaitility.Awaitility.*; public class RedissonMapCacheTest extends BaseMapTest { @@ -1034,7 +1034,7 @@ public class RedissonMapCacheTest extends BaseMapTest { }); runnable.run(); - Awaitility.await().atMost(Duration.ONE_SECOND).untilTrue(ref); + await().atMost(Duration.ONE_SECOND).untilTrue(ref); map.removeListener(createListener1); } @@ -1090,7 +1090,7 @@ public class RedissonMapCacheTest extends BaseMapTest { }); runnable.run(); - Awaitility.await().atMost(Duration.ONE_MINUTE).untilTrue(ref); + await().atMost(Duration.ONE_MINUTE).untilTrue(ref); map.removeListener(createListener1); } @@ -1113,7 +1113,7 @@ public class RedissonMapCacheTest extends BaseMapTest { }); runnable.run(); - Awaitility.await().atMost(Duration.ONE_SECOND).untilTrue(ref); + await().atMost(Duration.ONE_SECOND).untilTrue(ref); map.removeListener(createListener1); } @@ -1148,7 +1148,7 @@ public class RedissonMapCacheTest extends BaseMapTest { }); runnable.run(); - Awaitility.await().atMost(Duration.ONE_SECOND).untilTrue(ref); + await().atMost(Duration.ONE_SECOND).untilTrue(ref); map.removeListener(createListener1); } diff --git a/redisson/src/test/java/org/redisson/RedissonMultiLockTest.java b/redisson/src/test/java/org/redisson/RedissonMultiLockTest.java index 0a3d832ed..de4612ba2 100644 --- a/redisson/src/test/java/org/redisson/RedissonMultiLockTest.java +++ b/redisson/src/test/java/org/redisson/RedissonMultiLockTest.java @@ -12,7 +12,7 @@ import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.redisson.config.Config; -import static com.jayway.awaitility.Awaitility.await; +import static org.awaitility.Awaitility.await; import static org.assertj.core.api.Assertions.assertThat; public class RedissonMultiLockTest { @@ -87,7 +87,7 @@ public class RedissonMultiLockTest { t.start(); t.join(); - await().atMost(5, TimeUnit.SECONDS).until(() -> assertThat(executed.get()).isTrue()); + await().atMost(5, TimeUnit.SECONDS).until(() -> executed.get()); lock.unlock(); diff --git a/redisson/src/test/java/org/redisson/RedissonReadWriteLockTest.java b/redisson/src/test/java/org/redisson/RedissonReadWriteLockTest.java index b4563eee6..ca411e5b6 100644 --- a/redisson/src/test/java/org/redisson/RedissonReadWriteLockTest.java +++ b/redisson/src/test/java/org/redisson/RedissonReadWriteLockTest.java @@ -1,6 +1,5 @@ package org.redisson; -import static com.jayway.awaitility.Awaitility.await; import static org.assertj.core.api.Assertions.assertThat; import java.security.SecureRandom; @@ -10,15 +9,66 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import org.awaitility.Duration; import org.junit.Assert; import org.junit.Test; +import org.redisson.ClusterRunner.ClusterProcesses; import org.redisson.api.RLock; import org.redisson.api.RReadWriteLock; +import org.redisson.api.RedissonClient; +import org.redisson.config.Config; -import com.jayway.awaitility.Awaitility; +import static org.awaitility.Awaitility.*; public class RedissonReadWriteLockTest extends BaseConcurrentTest { + @Test + public void testWriteLockExpiration() throws InterruptedException { + RReadWriteLock rw1 = redisson.getReadWriteLock("test2s3"); + + RLock l1 = rw1.writeLock(); + assertThat(l1.tryLock(10000, 10000, TimeUnit.MILLISECONDS)).isTrue(); + RLock l2 = rw1.writeLock(); + assertThat(l2.tryLock(1000, 1000, TimeUnit.MILLISECONDS)).isTrue(); + + await().atMost(Duration.TEN_SECONDS).until(() -> { + RReadWriteLock rw2 = redisson.getReadWriteLock("test2s3"); + try { + return !rw2.writeLock().tryLock(3000, 1000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + return false; + } + }); + } + + @Test + public void testInCluster() throws Exception { + RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); + + ClusterRunner clusterRunner = new ClusterRunner() + .addNode(master1) + .addNode(master2) + .addNode(master3); + ClusterProcesses process = clusterRunner.run(); + + Config config = new Config(); + config.useClusterServers() + .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); + RedissonClient redisson = Redisson.create(config); + + RReadWriteLock s = redisson.getReadWriteLock("1234"); + s.writeLock().lock(); + s.readLock().lock(); + s.readLock().unlock(); + s.writeLock().unlock(); + + redisson.shutdown(); + process.shutdown(); + } + @Test public void testReadLockLeaseTimeoutDiffThreadsWRR() throws InterruptedException { RLock writeLock = redisson.getReadWriteLock("my_read_write_lock").writeLock(); @@ -319,7 +369,7 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest { }); RReadWriteLock lock1 = redisson.getReadWriteLock("lock"); - Awaitility.await().atMost(redisson.getConfig().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS).until(() -> !lock1.writeLock().isLocked()); + await().atMost(redisson.getConfig().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS).until(() -> !lock1.writeLock().isLocked()); } @Test diff --git a/redisson/src/test/java/org/redisson/RedissonRedLockTest.java b/redisson/src/test/java/org/redisson/RedissonRedLockTest.java index b15180870..8c15c20c0 100644 --- a/redisson/src/test/java/org/redisson/RedissonRedLockTest.java +++ b/redisson/src/test/java/org/redisson/RedissonRedLockTest.java @@ -17,7 +17,7 @@ import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.redisson.config.Config; -import static com.jayway.awaitility.Awaitility.await; +import static org.awaitility.Awaitility.await; import static org.assertj.core.api.Assertions.assertThat; public class RedissonRedLockTest { @@ -316,7 +316,7 @@ public class RedissonRedLockTest { t.start(); t.join(); - await().atMost(5, TimeUnit.SECONDS).until(() -> assertThat(executed.get()).isTrue()); + await().atMost(5, TimeUnit.SECONDS).until(() -> executed.get()); lock.unlock(); diff --git a/redisson/src/test/java/org/redisson/RedissonTest.java b/redisson/src/test/java/org/redisson/RedissonTest.java index 3660ef171..c35c02ed4 100644 --- a/redisson/src/test/java/org/redisson/RedissonTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTest.java @@ -1,6 +1,6 @@ package org.redisson; -import static com.jayway.awaitility.Awaitility.await; +import static org.awaitility.Awaitility.*; import static org.assertj.core.api.Assertions.assertThat; import static org.redisson.BaseTest.createInstance; @@ -289,8 +289,8 @@ public class RedissonTest { Assert.assertEquals(0, pp.stop()); - await().atMost(2, TimeUnit.SECONDS).until(() -> assertThat(connectCounter.get()).isEqualTo(1)); - await().atMost(2, TimeUnit.SECONDS).until(() -> assertThat(disconnectCounter.get()).isEqualTo(1)); + await().atMost(2, TimeUnit.SECONDS).until(() -> connectCounter.get() == 1); + await().atMost(2, TimeUnit.SECONDS).until(() -> disconnectCounter.get() == 1); } @Test diff --git a/redisson/src/test/java/org/redisson/RedissonTopicPatternTest.java b/redisson/src/test/java/org/redisson/RedissonTopicPatternTest.java index e13e42e5d..0dc2f4cb8 100644 --- a/redisson/src/test/java/org/redisson/RedissonTopicPatternTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTopicPatternTest.java @@ -1,6 +1,6 @@ package org.redisson; -import static com.jayway.awaitility.Awaitility.await; +import static org.awaitility.Awaitility.await; import java.io.IOException; import java.util.ArrayList; diff --git a/redisson/src/test/java/org/redisson/RedissonTopicTest.java b/redisson/src/test/java/org/redisson/RedissonTopicTest.java index 5eacbddd9..535abb755 100644 --- a/redisson/src/test/java/org/redisson/RedissonTopicTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTopicTest.java @@ -1,7 +1,7 @@ package org.redisson; -import static com.jayway.awaitility.Awaitility.await; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import java.io.IOException; import java.io.Serializable; @@ -16,6 +16,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.awaitility.Duration; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -34,9 +35,6 @@ import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.StringCodec; import org.redisson.config.Config; -import com.jayway.awaitility.Awaitility; -import com.jayway.awaitility.Duration; - public class RedissonTopicTest { @BeforeClass @@ -157,7 +155,7 @@ public class RedissonTopicTest { }); topic1.publish(123L); - Awaitility.await().atMost(Duration.ONE_SECOND).untilTrue(stringMessageReceived); + await().atMost(Duration.ONE_SECOND).untilTrue(stringMessageReceived); redisson1.shutdown(); } @@ -178,7 +176,7 @@ public class RedissonTopicTest { }); stringTopic.publish("testmsg"); - Awaitility.await().atMost(Duration.ONE_SECOND).untilTrue(stringMessageReceived); + await().atMost(Duration.ONE_SECOND).untilTrue(stringMessageReceived); stringTopic.removeListener(listenerId); } @@ -213,8 +211,8 @@ public class RedissonTopicTest { }); longTopic.publish(1L); - Awaitility.await().atMost(Duration.ONE_SECOND).untilTrue(stringMessageReceived); - Awaitility.await().atMost(Duration.ONE_SECOND).untilTrue(longMessageReceived); + await().atMost(Duration.ONE_SECOND).untilTrue(stringMessageReceived); + await().atMost(Duration.ONE_SECOND).untilTrue(longMessageReceived); } @Test diff --git a/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java b/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java index 514662deb..5ba5e9a9e 100644 --- a/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java +++ b/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java @@ -14,6 +14,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.awaitility.Duration; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -28,8 +29,7 @@ import org.redisson.api.RExecutorService; import org.redisson.config.Config; import org.redisson.config.RedissonNodeConfig; -import com.jayway.awaitility.Awaitility; -import com.jayway.awaitility.Duration; +import static org.awaitility.Awaitility.*; public class RedissonExecutorServiceTest extends BaseTest { @@ -121,7 +121,7 @@ public class RedissonExecutorServiceTest extends BaseTest { e.execute(new IncrementRunnableTask("myCounter"), new IncrementRunnableTask("myCounter"), new IncrementRunnableTask("myCounter"), new IncrementRunnableTask("myCounter")); - Awaitility.await().atMost(Duration.FIVE_SECONDS).until(() -> redisson.getAtomicLong("myCounter").get() == 4); + await().atMost(Duration.FIVE_SECONDS).until(() -> redisson.getAtomicLong("myCounter").get() == 4); } @Test