From eb36207f93add5faa4b2b58677efa1f19cfab426 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 4 Jun 2018 12:08:51 +0300 Subject: [PATCH] Fixed - methods belongs to transactional objects get blocked at high concurrency. #1459 --- .../main/java/org/redisson/RedissonLock.java | 124 +++++++-------- .../RedissonPermitExpirableSemaphore.java | 144 +++++++++--------- .../java/org/redisson/RedissonSemaphore.java | 90 +++++------ .../java/org/redisson/pubsub/LockPubSub.java | 8 +- .../org/redisson/pubsub/SemaphorePubSub.java | 8 +- .../RedissonBaseTransactionalMapTest.java | 6 +- 6 files changed, 176 insertions(+), 204 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonLock.java b/redisson/src/main/java/org/redisson/RedissonLock.java index 023f9b267..bd848489b 100644 --- a/redisson/src/main/java/org/redisson/RedissonLock.java +++ b/redisson/src/main/java/org/redisson/RedissonLock.java @@ -575,38 +575,34 @@ public class RedissonLock extends RedissonExpirable implements RLock { return; } - // waiting for message final RedissonLockEntry entry = getEntry(currentThreadId); - synchronized (entry) { - if (entry.getLatch().tryAcquire()) { - lockAsync(leaseTime, unit, subscribeFuture, result, currentThreadId); - } else { - final AtomicReference futureRef = new AtomicReference(); - final Runnable listener = new Runnable() { + if (entry.getLatch().tryAcquire()) { + lockAsync(leaseTime, unit, subscribeFuture, result, currentThreadId); + } else { + // waiting for message + final AtomicReference futureRef = new AtomicReference(); + final Runnable listener = new Runnable() { + @Override + public void run() { + if (futureRef.get() != null) { + futureRef.get().cancel(); + } + lockAsync(leaseTime, unit, subscribeFuture, result, currentThreadId); + } + }; + + entry.addListener(listener); + + if (ttl >= 0) { + Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override - public void run() { - if (futureRef.get() != null) { - futureRef.get().cancel(); + public void run(Timeout timeout) throws Exception { + if (entry.removeListener(listener)) { + lockAsync(leaseTime, unit, subscribeFuture, result, currentThreadId); } - lockAsync(leaseTime, unit, subscribeFuture, result, currentThreadId); } - }; - - entry.addListener(listener); - - if (ttl >= 0) { - Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { - @Override - public void run(Timeout timeout) throws Exception { - synchronized (entry) { - if (entry.removeListener(listener)) { - lockAsync(leaseTime, unit, subscribeFuture, result, currentThreadId); - } - } - } - }, ttl, TimeUnit.MILLISECONDS); - futureRef.set(scheduledFuture); - } + }, ttl, TimeUnit.MILLISECONDS); + futureRef.set(scheduledFuture); } } } @@ -768,48 +764,44 @@ public class RedissonLock extends RedissonExpirable implements RLock { // waiting for message final long current = System.currentTimeMillis(); final RedissonLockEntry entry = getEntry(currentThreadId); - synchronized (entry) { - if (entry.getLatch().tryAcquire()) { - tryLockAsync(time, leaseTime, unit, subscribeFuture, result, currentThreadId); - } else { - final AtomicBoolean executed = new AtomicBoolean(); - final AtomicReference futureRef = new AtomicReference(); - final Runnable listener = new Runnable() { - @Override - public void run() { - executed.set(true); - if (futureRef.get() != null) { - futureRef.get().cancel(); - } - - long elapsed = System.currentTimeMillis() - current; - time.addAndGet(-elapsed); - - tryLockAsync(time, leaseTime, unit, subscribeFuture, result, currentThreadId); + if (entry.getLatch().tryAcquire()) { + tryLockAsync(time, leaseTime, unit, subscribeFuture, result, currentThreadId); + } else { + final AtomicBoolean executed = new AtomicBoolean(); + final AtomicReference futureRef = new AtomicReference(); + final Runnable listener = new Runnable() { + @Override + public void run() { + executed.set(true); + if (futureRef.get() != null) { + futureRef.get().cancel(); } - }; - entry.addListener(listener); - long t = time.get(); - if (ttl >= 0 && ttl < time.get()) { - t = ttl; + long elapsed = System.currentTimeMillis() - current; + time.addAndGet(-elapsed); + + tryLockAsync(time, leaseTime, unit, subscribeFuture, result, currentThreadId); } - if (!executed.get()) { - Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { - @Override - public void run(Timeout timeout) throws Exception { - synchronized (entry) { - if (entry.removeListener(listener)) { - long elapsed = System.currentTimeMillis() - current; - time.addAndGet(-elapsed); - - tryLockAsync(time, leaseTime, unit, subscribeFuture, result, currentThreadId); - } - } + }; + entry.addListener(listener); + + long t = time.get(); + if (ttl >= 0 && ttl < time.get()) { + t = ttl; + } + if (!executed.get()) { + Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + if (entry.removeListener(listener)) { + long elapsed = System.currentTimeMillis() - current; + time.addAndGet(-elapsed); + + tryLockAsync(time, leaseTime, unit, subscribeFuture, result, currentThreadId); } - }, t, TimeUnit.MILLISECONDS); - futureRef.set(scheduledFuture); - } + } + }, t, TimeUnit.MILLISECONDS); + futureRef.set(scheduledFuture); } } } diff --git a/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java b/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java index ae288171a..c09b62061 100644 --- a/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java +++ b/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java @@ -213,40 +213,17 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen // waiting for message final long current = System.currentTimeMillis(); final RedissonLockEntry entry = getEntry(); - synchronized (entry) { - if (entry.getLatch().tryAcquire()) { - tryAcquireAsync(time, permits, subscribeFuture, result, ttl, timeUnit); - } else { - final AtomicReference waitTimeoutFutureRef = new AtomicReference(); - - final Timeout scheduledFuture; - if (nearestTimeout != null) { - scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { - @Override - public void run(Timeout timeout) throws Exception { - if (waitTimeoutFutureRef.get() != null && !waitTimeoutFutureRef.get().cancel()) { - return; - } - - long elapsed = System.currentTimeMillis() - current; - time.addAndGet(-elapsed); - - tryAcquireAsync(time, permits, subscribeFuture, result, ttl, timeUnit); - } - }, nearestTimeout, TimeUnit.MILLISECONDS); - } else { - scheduledFuture = null; - } + if (entry.getLatch().tryAcquire()) { + tryAcquireAsync(time, permits, subscribeFuture, result, ttl, timeUnit); + } else { + final AtomicReference waitTimeoutFutureRef = new AtomicReference(); - final Runnable listener = new Runnable() { + final Timeout scheduledFuture; + if (nearestTimeout != null) { + scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override - public void run() { + public void run(Timeout timeout) throws Exception { if (waitTimeoutFutureRef.get() != null && !waitTimeoutFutureRef.get().cancel()) { - entry.getLatch().release(); - return; - } - if (scheduledFuture != null && !scheduledFuture.cancel()) { - entry.getLatch().release(); return; } @@ -255,29 +232,48 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen tryAcquireAsync(time, permits, subscribeFuture, result, ttl, timeUnit); } - }; - entry.addListener(listener); + }, nearestTimeout, TimeUnit.MILLISECONDS); + } else { + scheduledFuture = null; + } - long t = time.get(); - Timeout waitTimeoutFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { - @Override - public void run(Timeout timeout) throws Exception { - if (scheduledFuture != null && !scheduledFuture.cancel()) { - return; - } + final Runnable listener = new Runnable() { + @Override + public void run() { + if (waitTimeoutFutureRef.get() != null && !waitTimeoutFutureRef.get().cancel()) { + entry.getLatch().release(); + return; + } + if (scheduledFuture != null && !scheduledFuture.cancel()) { + entry.getLatch().release(); + return; + } + + long elapsed = System.currentTimeMillis() - current; + time.addAndGet(-elapsed); - synchronized (entry) { - if (entry.removeListener(listener)) { - long elapsed = System.currentTimeMillis() - current; - time.addAndGet(-elapsed); - - tryAcquireAsync(time, permits, subscribeFuture, result, ttl, timeUnit); - } - } + tryAcquireAsync(time, permits, subscribeFuture, result, ttl, timeUnit); + } + }; + entry.addListener(listener); + + long t = time.get(); + Timeout waitTimeoutFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + if (scheduledFuture != null && !scheduledFuture.cancel()) { + return; } - }, t, TimeUnit.MILLISECONDS); - waitTimeoutFutureRef.set(waitTimeoutFuture); - } + + if (entry.removeListener(listener)) { + long elapsed = System.currentTimeMillis() - current; + time.addAndGet(-elapsed); + + tryAcquireAsync(time, permits, subscribeFuture, result, ttl, timeUnit); + } + } + }, t, TimeUnit.MILLISECONDS); + waitTimeoutFutureRef.set(waitTimeoutFuture); } } }); @@ -318,34 +314,32 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen } final RedissonLockEntry entry = getEntry(); - synchronized (entry) { - if (entry.getLatch().tryAcquire(permits)) { - acquireAsync(permits, subscribeFuture, result, ttl, timeUnit); - } else { - final Timeout scheduledFuture; - if (nearestTimeout != null) { - scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { - @Override - public void run(Timeout timeout) throws Exception { - acquireAsync(permits, subscribeFuture, result, ttl, timeUnit); - } - }, nearestTimeout, TimeUnit.MILLISECONDS); - } else { - scheduledFuture = null; - } - - Runnable listener = new Runnable() { + if (entry.getLatch().tryAcquire(permits)) { + acquireAsync(permits, subscribeFuture, result, ttl, timeUnit); + } else { + final Timeout scheduledFuture; + if (nearestTimeout != null) { + scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override - public void run() { - if (scheduledFuture != null && !scheduledFuture.cancel()) { - entry.getLatch().release(); - return; - } + public void run(Timeout timeout) throws Exception { acquireAsync(permits, subscribeFuture, result, ttl, timeUnit); } - }; - entry.addListener(listener); + }, nearestTimeout, TimeUnit.MILLISECONDS); + } else { + scheduledFuture = null; } + + Runnable listener = new Runnable() { + @Override + public void run() { + if (scheduledFuture != null && !scheduledFuture.cancel()) { + entry.getLatch().release(); + return; + } + acquireAsync(permits, subscribeFuture, result, ttl, timeUnit); + } + }; + entry.addListener(listener); } } }); diff --git a/redisson/src/main/java/org/redisson/RedissonSemaphore.java b/redisson/src/main/java/org/redisson/RedissonSemaphore.java index 4f5d9d086..f12f6b795 100644 --- a/redisson/src/main/java/org/redisson/RedissonSemaphore.java +++ b/redisson/src/main/java/org/redisson/RedissonSemaphore.java @@ -180,45 +180,41 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { // waiting for message final long current = System.currentTimeMillis(); final RedissonLockEntry entry = getEntry(); - synchronized (entry) { - if (entry.getLatch().tryAcquire()) { - tryAcquireAsync(time, permits, subscribeFuture, result); - } else { - final AtomicBoolean executed = new AtomicBoolean(); - final AtomicReference futureRef = new AtomicReference(); - final Runnable listener = new Runnable() { + if (entry.getLatch().tryAcquire()) { + tryAcquireAsync(time, permits, subscribeFuture, result); + } else { + final AtomicBoolean executed = new AtomicBoolean(); + final AtomicReference futureRef = new AtomicReference(); + final Runnable listener = new Runnable() { + @Override + public void run() { + executed.set(true); + if (futureRef.get() != null && !futureRef.get().cancel()) { + entry.getLatch().release(); + return; + } + long elapsed = System.currentTimeMillis() - current; + time.addAndGet(-elapsed); + + tryAcquireAsync(time, permits, subscribeFuture, result); + } + }; + entry.addListener(listener); + + long t = time.get(); + if (!executed.get()) { + Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override - public void run() { - executed.set(true); - if (futureRef.get() != null && !futureRef.get().cancel()) { - entry.getLatch().release(); - return; + public void run(Timeout timeout) throws Exception { + if (entry.removeListener(listener)) { + long elapsed = System.currentTimeMillis() - current; + time.addAndGet(-elapsed); + + tryAcquireAsync(time, permits, subscribeFuture, result); } - long elapsed = System.currentTimeMillis() - current; - time.addAndGet(-elapsed); - - tryAcquireAsync(time, permits, subscribeFuture, result); } - }; - entry.addListener(listener); - - long t = time.get(); - if (!executed.get()) { - Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { - @Override - public void run(Timeout timeout) throws Exception { - synchronized (entry) { - if (entry.removeListener(listener)) { - long elapsed = System.currentTimeMillis() - current; - time.addAndGet(-elapsed); - - tryAcquireAsync(time, permits, subscribeFuture, result); - } - } - } - }, t, TimeUnit.MILLISECONDS); - futureRef.set(scheduledFuture); - } + }, t, TimeUnit.MILLISECONDS); + futureRef.set(scheduledFuture); } } } @@ -251,18 +247,16 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { } final RedissonLockEntry entry = getEntry(); - synchronized (entry) { - if (entry.getLatch().tryAcquire(permits)) { - acquireAsync(permits, subscribeFuture, result); - } else { - Runnable listener = new Runnable() { - @Override - public void run() { - acquireAsync(permits, subscribeFuture, result); - } - }; - entry.addListener(listener); - } + if (entry.getLatch().tryAcquire(permits)) { + acquireAsync(permits, subscribeFuture, result); + } else { + Runnable listener = new Runnable() { + @Override + public void run() { + acquireAsync(permits, subscribeFuture, result); + } + }; + entry.addListener(listener); } } }); diff --git a/redisson/src/main/java/org/redisson/pubsub/LockPubSub.java b/redisson/src/main/java/org/redisson/pubsub/LockPubSub.java index 1ed966325..e3e913027 100644 --- a/redisson/src/main/java/org/redisson/pubsub/LockPubSub.java +++ b/redisson/src/main/java/org/redisson/pubsub/LockPubSub.java @@ -35,12 +35,8 @@ public class LockPubSub extends PublishSubscribe { @Override protected void onMessage(RedissonLockEntry value, Long message) { if (message.equals(unlockMessage)) { - while (true) { - Runnable runnableToExecute = value.getListeners().poll(); - if (runnableToExecute == null) { - break; - } - + Runnable runnableToExecute = value.getListeners().poll(); + if (runnableToExecute != null) { runnableToExecute.run(); } diff --git a/redisson/src/main/java/org/redisson/pubsub/SemaphorePubSub.java b/redisson/src/main/java/org/redisson/pubsub/SemaphorePubSub.java index c76a886f2..6288b571a 100644 --- a/redisson/src/main/java/org/redisson/pubsub/SemaphorePubSub.java +++ b/redisson/src/main/java/org/redisson/pubsub/SemaphorePubSub.java @@ -32,12 +32,8 @@ public class SemaphorePubSub extends PublishSubscribe { @Override protected void onMessage(RedissonLockEntry value, Long message) { - while (true) { - Runnable runnableToExecute = value.getListeners().poll(); - if (runnableToExecute == null) { - break; - } - + Runnable runnableToExecute = value.getListeners().poll(); + if (runnableToExecute != null) { runnableToExecute.run(); } diff --git a/redisson/src/test/java/org/redisson/transaction/RedissonBaseTransactionalMapTest.java b/redisson/src/test/java/org/redisson/transaction/RedissonBaseTransactionalMapTest.java index 27c7c4240..e3f71c352 100644 --- a/redisson/src/test/java/org/redisson/transaction/RedissonBaseTransactionalMapTest.java +++ b/redisson/src/test/java/org/redisson/transaction/RedissonBaseTransactionalMapTest.java @@ -22,8 +22,8 @@ public abstract class RedissonBaseTransactionalMapTest extends BaseTest { @Test public void testFastPut() throws InterruptedException { - ExecutorService executor = Executors.newFixedThreadPool(16); - for (int i = 0; i < 500; i++) { + ExecutorService executor = Executors.newFixedThreadPool(200); + for (int i = 0; i < 2000; i++) { executor.submit(() -> { for (int j = 0; j < 100; j++) { RTransaction t = redisson.createTransaction(TransactionOptions.defaults()); @@ -35,7 +35,7 @@ public abstract class RedissonBaseTransactionalMapTest extends BaseTest { } executor.shutdown(); - assertThat(executor.awaitTermination(1, TimeUnit.MINUTES)).isTrue(); + assertThat(executor.awaitTermination(2, TimeUnit.MINUTES)).isTrue(); }