diff --git a/CHANGELOG.md b/CHANGELOG.md
index 073e729af..b57a4b876 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,24 @@ Redisson Releases History
Try __[Redisson PRO](https://redisson.pro)__ version.
+### 02-Jun-2018 - versions 2.12.1 and 3.7.1 released
+Feature - `RRateLimiter` object moved to open-source version
+Feature - ExecutorService task failover. Default failover interval is 60 seconds
+Feature - `RScoredSortedSet.pollFirst` and `pollLast` methods with count parameter added
+Feature - `RScoredSortedSet.pollFirst` and `pollLast` methods with timeout added
+Feature - `RScoredSortedSet.pollFirstFromAny` and `pollLastFromAny` methods added
+Improvement - `Node.time()` method returns `Time` object
+Improvement - RListReactive, RMapCacheReactive, RSetCacheReactive and RSetReactive are up-to-date to Async interfaces
+Fixed - setPingConnectionInterval is not propagated for single server configuration
+Fixed - ClusterConnectionManager should use shared resolverGroup
+Fixed - value can't be added to BloomFilter
+Fixed - Redis nodes with noaddr flag should be parsed correctly
+Fixed - methods belongs to transactional objects get blocked at high concurrency
+Fixed - Collection iterator doesn't use the same Redis node
+Fixed - ExecuteService response queue expiration time set to one hour
+Fixed - Executed remote tasks are not removed from Redis
+Fixed - `reconnectionTimeout` and `failedAttempts` renamed in xsd schema
+
### 14-May-2018 - versions 2.12.0 and 3.7.0 released
Feature - __Proxy mode__ Please refer to [documentation](https://github.com/redisson/redisson/wiki/2.-Configuration#29-proxy-mode) for more details
Feature - __Transaction API implementation__ Please refer to [documentation](https://github.com/redisson/redisson/wiki/10.-additional-features/#104-transactions) for more details
diff --git a/README.md b/README.md
index 8e3b45ed4..5c1487a73 100644
--- a/README.md
+++ b/README.md
@@ -1,13 +1,13 @@
Redisson: Redis based In-Memory Data Grid for Java.
State of the Art Redis client
====
-[Quick start](https://github.com/redisson/redisson#quick-start) | [Documentation](https://github.com/redisson/redisson/wiki) | [Javadocs](http://www.javadoc.io/doc/org.redisson/redisson/3.7.0) | [Changelog](https://github.com/redisson/redisson/blob/master/CHANGELOG.md) | [Code examples](https://github.com/redisson/redisson-examples) | [FAQs](https://github.com/redisson/redisson/wiki/16.-FAQ) | [Support chat](https://gitter.im/mrniko/redisson) | **[Redisson PRO](https://redisson.pro)**
+[Quick start](https://github.com/redisson/redisson#quick-start) | [Documentation](https://github.com/redisson/redisson/wiki) | [Javadocs](http://www.javadoc.io/doc/org.redisson/redisson/3.7.1) | [Changelog](https://github.com/redisson/redisson/blob/master/CHANGELOG.md) | [Code examples](https://github.com/redisson/redisson-examples) | [FAQs](https://github.com/redisson/redisson/wiki/16.-FAQ) | [Support chat](https://gitter.im/mrniko/redisson) | **[Redisson PRO](https://redisson.pro)**
Based on high-performance async and lock-free Java Redis client and [Netty](http://netty.io) framework.
| Stable
Release Version | Release Date | JDK Version
compatibility | `CompletionStage`
support | `ProjectReactor` version
compatibility |
| ------------- | ------------- | ------------| -----------| -----------|
-| 3.7.0 | 14.05.2018 | 1.8, 1.9, 1.10+ | Yes | 3.1.x |
-| 2.12.0 | 14.05.2018 | 1.6, 1.7, 1.8, 1.9, 1.10, Android | No | 2.0.8 |
+| 3.7.1 | 02.06.2018 | 1.8, 1.9, 1.10+ | Yes | 3.1.x |
+| 2.12.1 | 02.06.2018 | 1.6, 1.7, 1.8, 1.9, 1.10, Android | No | 2.0.8 |
Features
@@ -89,6 +89,8 @@ Success stories
## [Moving from Hazelcast to Redis / Datorama](https://engineering.datorama.com/moving-from-hazelcast-to-redis-b90a0769d1cb)
## [Distributed Locking with Redis (Migration from Hazelcast) / ContaAzul](https://carlosbecker.com/posts/distributed-locks-redis/)
+## [Migrating from Coherence to Redis / RCI](https://www.youtube.com/watch?v=JF5R2ucKTEg)
+
Quick start
===============================
@@ -98,23 +100,23 @@ Quick start
org.redisson
redisson
- 3.7.0
+ 3.7.1
org.redisson
redisson
- 2.12.0
+ 2.12.1
#### Gradle
// JDK 1.8+ compatible
- compile 'org.redisson:redisson:3.7.0'
+ compile 'org.redisson:redisson:3.7.1'
// JDK 1.6+ compatible
- compile 'org.redisson:redisson:2.12.0'
+ compile 'org.redisson:redisson:2.12.1'
#### Java
@@ -139,11 +141,11 @@ RExecutorService executor = redisson.getExecutorService("myExecutorService");
Downloads
===============================
-[Redisson 3.7.0](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=3.7.0&e=jar),
-[Redisson node 3.7.0](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.7.0&e=jar)
+[Redisson 3.7.1](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=3.7.1&e=jar),
+[Redisson node 3.7.1](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.7.1&e=jar)
-[Redisson 2.12.0](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=2.12.0&e=jar),
-[Redisson node 2.12.0](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=2.12.0&e=jar)
+[Redisson 2.12.1](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=2.12.1&e=jar),
+[Redisson node 2.12.1](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=2.12.1&e=jar)
FAQs
===============================
diff --git a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java
index 50ceea08b..a4c18c7ec 100644
--- a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java
+++ b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java
@@ -151,14 +151,6 @@ public class RedissonLocalCachedMap extends RedissonMap implements R
instanceId = generateId();
syncStrategy = options.getSyncStrategy();
-
- if (options.getSyncStrategy() != SyncStrategy.NONE) {
- invalidateEntryOnChange = 1;
- }
- if (options.getReconnectionStrategy() == ReconnectionStrategy.LOAD) {
- invalidateEntryOnChange = 2;
- evictionScheduler.schedule(listener.getUpdatesLogName(), cacheUpdateLogTime + TimeUnit.MINUTES.toMillis(1));
- }
cache = createCache(options);
@@ -174,6 +166,14 @@ public class RedissonLocalCachedMap extends RedissonMap implements R
};
listener.add();
+
+ if (options.getSyncStrategy() != SyncStrategy.NONE) {
+ invalidateEntryOnChange = 1;
+ }
+ if (options.getReconnectionStrategy() == ReconnectionStrategy.LOAD) {
+ invalidateEntryOnChange = 2;
+ evictionScheduler.schedule(listener.getUpdatesLogName(), cacheUpdateLogTime + TimeUnit.MINUTES.toMillis(1));
+ }
}
private void cachePut(CacheKey cacheKey, Object key, Object value) {
diff --git a/redisson/src/main/java/org/redisson/RedissonLock.java b/redisson/src/main/java/org/redisson/RedissonLock.java
index 023f9b267..bd848489b 100644
--- a/redisson/src/main/java/org/redisson/RedissonLock.java
+++ b/redisson/src/main/java/org/redisson/RedissonLock.java
@@ -575,38 +575,34 @@ public class RedissonLock extends RedissonExpirable implements RLock {
return;
}
- // waiting for message
final RedissonLockEntry entry = getEntry(currentThreadId);
- synchronized (entry) {
- if (entry.getLatch().tryAcquire()) {
- lockAsync(leaseTime, unit, subscribeFuture, result, currentThreadId);
- } else {
- final AtomicReference futureRef = new AtomicReference();
- final Runnable listener = new Runnable() {
+ if (entry.getLatch().tryAcquire()) {
+ lockAsync(leaseTime, unit, subscribeFuture, result, currentThreadId);
+ } else {
+ // waiting for message
+ final AtomicReference futureRef = new AtomicReference();
+ final Runnable listener = new Runnable() {
+ @Override
+ public void run() {
+ if (futureRef.get() != null) {
+ futureRef.get().cancel();
+ }
+ lockAsync(leaseTime, unit, subscribeFuture, result, currentThreadId);
+ }
+ };
+
+ entry.addListener(listener);
+
+ if (ttl >= 0) {
+ Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
- public void run() {
- if (futureRef.get() != null) {
- futureRef.get().cancel();
+ public void run(Timeout timeout) throws Exception {
+ if (entry.removeListener(listener)) {
+ lockAsync(leaseTime, unit, subscribeFuture, result, currentThreadId);
}
- lockAsync(leaseTime, unit, subscribeFuture, result, currentThreadId);
}
- };
-
- entry.addListener(listener);
-
- if (ttl >= 0) {
- Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
- @Override
- public void run(Timeout timeout) throws Exception {
- synchronized (entry) {
- if (entry.removeListener(listener)) {
- lockAsync(leaseTime, unit, subscribeFuture, result, currentThreadId);
- }
- }
- }
- }, ttl, TimeUnit.MILLISECONDS);
- futureRef.set(scheduledFuture);
- }
+ }, ttl, TimeUnit.MILLISECONDS);
+ futureRef.set(scheduledFuture);
}
}
}
@@ -768,48 +764,44 @@ public class RedissonLock extends RedissonExpirable implements RLock {
// waiting for message
final long current = System.currentTimeMillis();
final RedissonLockEntry entry = getEntry(currentThreadId);
- synchronized (entry) {
- if (entry.getLatch().tryAcquire()) {
- tryLockAsync(time, leaseTime, unit, subscribeFuture, result, currentThreadId);
- } else {
- final AtomicBoolean executed = new AtomicBoolean();
- final AtomicReference futureRef = new AtomicReference();
- final Runnable listener = new Runnable() {
- @Override
- public void run() {
- executed.set(true);
- if (futureRef.get() != null) {
- futureRef.get().cancel();
- }
-
- long elapsed = System.currentTimeMillis() - current;
- time.addAndGet(-elapsed);
-
- tryLockAsync(time, leaseTime, unit, subscribeFuture, result, currentThreadId);
+ if (entry.getLatch().tryAcquire()) {
+ tryLockAsync(time, leaseTime, unit, subscribeFuture, result, currentThreadId);
+ } else {
+ final AtomicBoolean executed = new AtomicBoolean();
+ final AtomicReference futureRef = new AtomicReference();
+ final Runnable listener = new Runnable() {
+ @Override
+ public void run() {
+ executed.set(true);
+ if (futureRef.get() != null) {
+ futureRef.get().cancel();
}
- };
- entry.addListener(listener);
- long t = time.get();
- if (ttl >= 0 && ttl < time.get()) {
- t = ttl;
+ long elapsed = System.currentTimeMillis() - current;
+ time.addAndGet(-elapsed);
+
+ tryLockAsync(time, leaseTime, unit, subscribeFuture, result, currentThreadId);
}
- if (!executed.get()) {
- Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
- @Override
- public void run(Timeout timeout) throws Exception {
- synchronized (entry) {
- if (entry.removeListener(listener)) {
- long elapsed = System.currentTimeMillis() - current;
- time.addAndGet(-elapsed);
-
- tryLockAsync(time, leaseTime, unit, subscribeFuture, result, currentThreadId);
- }
- }
+ };
+ entry.addListener(listener);
+
+ long t = time.get();
+ if (ttl >= 0 && ttl < time.get()) {
+ t = ttl;
+ }
+ if (!executed.get()) {
+ Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ if (entry.removeListener(listener)) {
+ long elapsed = System.currentTimeMillis() - current;
+ time.addAndGet(-elapsed);
+
+ tryLockAsync(time, leaseTime, unit, subscribeFuture, result, currentThreadId);
}
- }, t, TimeUnit.MILLISECONDS);
- futureRef.set(scheduledFuture);
- }
+ }
+ }, t, TimeUnit.MILLISECONDS);
+ futureRef.set(scheduledFuture);
}
}
}
diff --git a/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java b/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java
index ae288171a..c09b62061 100644
--- a/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java
+++ b/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java
@@ -213,40 +213,17 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
// waiting for message
final long current = System.currentTimeMillis();
final RedissonLockEntry entry = getEntry();
- synchronized (entry) {
- if (entry.getLatch().tryAcquire()) {
- tryAcquireAsync(time, permits, subscribeFuture, result, ttl, timeUnit);
- } else {
- final AtomicReference waitTimeoutFutureRef = new AtomicReference();
-
- final Timeout scheduledFuture;
- if (nearestTimeout != null) {
- scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
- @Override
- public void run(Timeout timeout) throws Exception {
- if (waitTimeoutFutureRef.get() != null && !waitTimeoutFutureRef.get().cancel()) {
- return;
- }
-
- long elapsed = System.currentTimeMillis() - current;
- time.addAndGet(-elapsed);
-
- tryAcquireAsync(time, permits, subscribeFuture, result, ttl, timeUnit);
- }
- }, nearestTimeout, TimeUnit.MILLISECONDS);
- } else {
- scheduledFuture = null;
- }
+ if (entry.getLatch().tryAcquire()) {
+ tryAcquireAsync(time, permits, subscribeFuture, result, ttl, timeUnit);
+ } else {
+ final AtomicReference waitTimeoutFutureRef = new AtomicReference();
- final Runnable listener = new Runnable() {
+ final Timeout scheduledFuture;
+ if (nearestTimeout != null) {
+ scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
- public void run() {
+ public void run(Timeout timeout) throws Exception {
if (waitTimeoutFutureRef.get() != null && !waitTimeoutFutureRef.get().cancel()) {
- entry.getLatch().release();
- return;
- }
- if (scheduledFuture != null && !scheduledFuture.cancel()) {
- entry.getLatch().release();
return;
}
@@ -255,29 +232,48 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
tryAcquireAsync(time, permits, subscribeFuture, result, ttl, timeUnit);
}
- };
- entry.addListener(listener);
+ }, nearestTimeout, TimeUnit.MILLISECONDS);
+ } else {
+ scheduledFuture = null;
+ }
- long t = time.get();
- Timeout waitTimeoutFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
- @Override
- public void run(Timeout timeout) throws Exception {
- if (scheduledFuture != null && !scheduledFuture.cancel()) {
- return;
- }
+ final Runnable listener = new Runnable() {
+ @Override
+ public void run() {
+ if (waitTimeoutFutureRef.get() != null && !waitTimeoutFutureRef.get().cancel()) {
+ entry.getLatch().release();
+ return;
+ }
+ if (scheduledFuture != null && !scheduledFuture.cancel()) {
+ entry.getLatch().release();
+ return;
+ }
+
+ long elapsed = System.currentTimeMillis() - current;
+ time.addAndGet(-elapsed);
- synchronized (entry) {
- if (entry.removeListener(listener)) {
- long elapsed = System.currentTimeMillis() - current;
- time.addAndGet(-elapsed);
-
- tryAcquireAsync(time, permits, subscribeFuture, result, ttl, timeUnit);
- }
- }
+ tryAcquireAsync(time, permits, subscribeFuture, result, ttl, timeUnit);
+ }
+ };
+ entry.addListener(listener);
+
+ long t = time.get();
+ Timeout waitTimeoutFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ if (scheduledFuture != null && !scheduledFuture.cancel()) {
+ return;
}
- }, t, TimeUnit.MILLISECONDS);
- waitTimeoutFutureRef.set(waitTimeoutFuture);
- }
+
+ if (entry.removeListener(listener)) {
+ long elapsed = System.currentTimeMillis() - current;
+ time.addAndGet(-elapsed);
+
+ tryAcquireAsync(time, permits, subscribeFuture, result, ttl, timeUnit);
+ }
+ }
+ }, t, TimeUnit.MILLISECONDS);
+ waitTimeoutFutureRef.set(waitTimeoutFuture);
}
}
});
@@ -318,34 +314,32 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
}
final RedissonLockEntry entry = getEntry();
- synchronized (entry) {
- if (entry.getLatch().tryAcquire(permits)) {
- acquireAsync(permits, subscribeFuture, result, ttl, timeUnit);
- } else {
- final Timeout scheduledFuture;
- if (nearestTimeout != null) {
- scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
- @Override
- public void run(Timeout timeout) throws Exception {
- acquireAsync(permits, subscribeFuture, result, ttl, timeUnit);
- }
- }, nearestTimeout, TimeUnit.MILLISECONDS);
- } else {
- scheduledFuture = null;
- }
-
- Runnable listener = new Runnable() {
+ if (entry.getLatch().tryAcquire(permits)) {
+ acquireAsync(permits, subscribeFuture, result, ttl, timeUnit);
+ } else {
+ final Timeout scheduledFuture;
+ if (nearestTimeout != null) {
+ scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
- public void run() {
- if (scheduledFuture != null && !scheduledFuture.cancel()) {
- entry.getLatch().release();
- return;
- }
+ public void run(Timeout timeout) throws Exception {
acquireAsync(permits, subscribeFuture, result, ttl, timeUnit);
}
- };
- entry.addListener(listener);
+ }, nearestTimeout, TimeUnit.MILLISECONDS);
+ } else {
+ scheduledFuture = null;
}
+
+ Runnable listener = new Runnable() {
+ @Override
+ public void run() {
+ if (scheduledFuture != null && !scheduledFuture.cancel()) {
+ entry.getLatch().release();
+ return;
+ }
+ acquireAsync(permits, subscribeFuture, result, ttl, timeUnit);
+ }
+ };
+ entry.addListener(listener);
}
}
});
diff --git a/redisson/src/main/java/org/redisson/RedissonSemaphore.java b/redisson/src/main/java/org/redisson/RedissonSemaphore.java
index 4f5d9d086..f12f6b795 100644
--- a/redisson/src/main/java/org/redisson/RedissonSemaphore.java
+++ b/redisson/src/main/java/org/redisson/RedissonSemaphore.java
@@ -180,45 +180,41 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
// waiting for message
final long current = System.currentTimeMillis();
final RedissonLockEntry entry = getEntry();
- synchronized (entry) {
- if (entry.getLatch().tryAcquire()) {
- tryAcquireAsync(time, permits, subscribeFuture, result);
- } else {
- final AtomicBoolean executed = new AtomicBoolean();
- final AtomicReference futureRef = new AtomicReference();
- final Runnable listener = new Runnable() {
+ if (entry.getLatch().tryAcquire()) {
+ tryAcquireAsync(time, permits, subscribeFuture, result);
+ } else {
+ final AtomicBoolean executed = new AtomicBoolean();
+ final AtomicReference futureRef = new AtomicReference();
+ final Runnable listener = new Runnable() {
+ @Override
+ public void run() {
+ executed.set(true);
+ if (futureRef.get() != null && !futureRef.get().cancel()) {
+ entry.getLatch().release();
+ return;
+ }
+ long elapsed = System.currentTimeMillis() - current;
+ time.addAndGet(-elapsed);
+
+ tryAcquireAsync(time, permits, subscribeFuture, result);
+ }
+ };
+ entry.addListener(listener);
+
+ long t = time.get();
+ if (!executed.get()) {
+ Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
- public void run() {
- executed.set(true);
- if (futureRef.get() != null && !futureRef.get().cancel()) {
- entry.getLatch().release();
- return;
+ public void run(Timeout timeout) throws Exception {
+ if (entry.removeListener(listener)) {
+ long elapsed = System.currentTimeMillis() - current;
+ time.addAndGet(-elapsed);
+
+ tryAcquireAsync(time, permits, subscribeFuture, result);
}
- long elapsed = System.currentTimeMillis() - current;
- time.addAndGet(-elapsed);
-
- tryAcquireAsync(time, permits, subscribeFuture, result);
}
- };
- entry.addListener(listener);
-
- long t = time.get();
- if (!executed.get()) {
- Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
- @Override
- public void run(Timeout timeout) throws Exception {
- synchronized (entry) {
- if (entry.removeListener(listener)) {
- long elapsed = System.currentTimeMillis() - current;
- time.addAndGet(-elapsed);
-
- tryAcquireAsync(time, permits, subscribeFuture, result);
- }
- }
- }
- }, t, TimeUnit.MILLISECONDS);
- futureRef.set(scheduledFuture);
- }
+ }, t, TimeUnit.MILLISECONDS);
+ futureRef.set(scheduledFuture);
}
}
}
@@ -251,18 +247,16 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
}
final RedissonLockEntry entry = getEntry();
- synchronized (entry) {
- if (entry.getLatch().tryAcquire(permits)) {
- acquireAsync(permits, subscribeFuture, result);
- } else {
- Runnable listener = new Runnable() {
- @Override
- public void run() {
- acquireAsync(permits, subscribeFuture, result);
- }
- };
- entry.addListener(listener);
- }
+ if (entry.getLatch().tryAcquire(permits)) {
+ acquireAsync(permits, subscribeFuture, result);
+ } else {
+ Runnable listener = new Runnable() {
+ @Override
+ public void run() {
+ acquireAsync(permits, subscribeFuture, result);
+ }
+ };
+ entry.addListener(listener);
}
}
});
diff --git a/redisson/src/main/java/org/redisson/pubsub/LockPubSub.java b/redisson/src/main/java/org/redisson/pubsub/LockPubSub.java
index 1ed966325..e3e913027 100644
--- a/redisson/src/main/java/org/redisson/pubsub/LockPubSub.java
+++ b/redisson/src/main/java/org/redisson/pubsub/LockPubSub.java
@@ -35,12 +35,8 @@ public class LockPubSub extends PublishSubscribe {
@Override
protected void onMessage(RedissonLockEntry value, Long message) {
if (message.equals(unlockMessage)) {
- while (true) {
- Runnable runnableToExecute = value.getListeners().poll();
- if (runnableToExecute == null) {
- break;
- }
-
+ Runnable runnableToExecute = value.getListeners().poll();
+ if (runnableToExecute != null) {
runnableToExecute.run();
}
diff --git a/redisson/src/main/java/org/redisson/pubsub/SemaphorePubSub.java b/redisson/src/main/java/org/redisson/pubsub/SemaphorePubSub.java
index c76a886f2..6288b571a 100644
--- a/redisson/src/main/java/org/redisson/pubsub/SemaphorePubSub.java
+++ b/redisson/src/main/java/org/redisson/pubsub/SemaphorePubSub.java
@@ -32,12 +32,8 @@ public class SemaphorePubSub extends PublishSubscribe {
@Override
protected void onMessage(RedissonLockEntry value, Long message) {
- while (true) {
- Runnable runnableToExecute = value.getListeners().poll();
- if (runnableToExecute == null) {
- break;
- }
-
+ Runnable runnableToExecute = value.getListeners().poll();
+ if (runnableToExecute != null) {
runnableToExecute.run();
}
diff --git a/redisson/src/test/java/org/redisson/transaction/RedissonBaseTransactionalMapTest.java b/redisson/src/test/java/org/redisson/transaction/RedissonBaseTransactionalMapTest.java
index 27c7c4240..e3f71c352 100644
--- a/redisson/src/test/java/org/redisson/transaction/RedissonBaseTransactionalMapTest.java
+++ b/redisson/src/test/java/org/redisson/transaction/RedissonBaseTransactionalMapTest.java
@@ -22,8 +22,8 @@ public abstract class RedissonBaseTransactionalMapTest extends BaseTest {
@Test
public void testFastPut() throws InterruptedException {
- ExecutorService executor = Executors.newFixedThreadPool(16);
- for (int i = 0; i < 500; i++) {
+ ExecutorService executor = Executors.newFixedThreadPool(200);
+ for (int i = 0; i < 2000; i++) {
executor.submit(() -> {
for (int j = 0; j < 100; j++) {
RTransaction t = redisson.createTransaction(TransactionOptions.defaults());
@@ -35,7 +35,7 @@ public abstract class RedissonBaseTransactionalMapTest extends BaseTest {
}
executor.shutdown();
- assertThat(executor.awaitTermination(1, TimeUnit.MINUTES)).isTrue();
+ assertThat(executor.awaitTermination(2, TimeUnit.MINUTES)).isTrue();
}