From 8d347ad22be930b9a92fd55724a5c90440fa0a3c Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 1 Jun 2018 10:38:43 +0300 Subject: [PATCH] Fixed - methods belongs to transactional objects get blocked at high concurrency. #1459 --- .../java/org/redisson/pubsub/LockPubSub.java | 23 +++++------------- .../org/redisson/pubsub/SemaphorePubSub.java | 23 +++++------------- .../RedissonBaseTransactionalMapTest.java | 24 ++++++++++++++++++- 3 files changed, 35 insertions(+), 35 deletions(-) diff --git a/redisson/src/main/java/org/redisson/pubsub/LockPubSub.java b/redisson/src/main/java/org/redisson/pubsub/LockPubSub.java index 08b023799..1ed966325 100644 --- a/redisson/src/main/java/org/redisson/pubsub/LockPubSub.java +++ b/redisson/src/main/java/org/redisson/pubsub/LockPubSub.java @@ -35,27 +35,16 @@ public class LockPubSub extends PublishSubscribe { @Override protected void onMessage(RedissonLockEntry value, Long message) { if (message.equals(unlockMessage)) { - value.getLatch().release(); - while (true) { - Runnable runnableToExecute = null; - synchronized (value) { - Runnable runnable = value.getListeners().poll(); - if (runnable != null) { - if (value.getLatch().tryAcquire()) { - runnableToExecute = runnable; - } else { - value.addListener(runnable); - } - } + Runnable runnableToExecute = value.getListeners().poll(); + if (runnableToExecute == null) { + break; } - if (runnableToExecute != null) { - runnableToExecute.run(); - } else { - return; - } + runnableToExecute.run(); } + + value.getLatch().release(); } } diff --git a/redisson/src/main/java/org/redisson/pubsub/SemaphorePubSub.java b/redisson/src/main/java/org/redisson/pubsub/SemaphorePubSub.java index 31268d737..c76a886f2 100644 --- a/redisson/src/main/java/org/redisson/pubsub/SemaphorePubSub.java +++ b/redisson/src/main/java/org/redisson/pubsub/SemaphorePubSub.java @@ -32,27 +32,16 @@ public class SemaphorePubSub extends PublishSubscribe { @Override protected void onMessage(RedissonLockEntry value, Long message) { - value.getLatch().release(message.intValue()); - while (true) { - Runnable runnableToExecute = null; - synchronized (value) { - Runnable runnable = value.getListeners().poll(); - if (runnable != null) { - if (value.getLatch().tryAcquire()) { - runnableToExecute = runnable; - } else { - value.addListener(runnable); - } - } + Runnable runnableToExecute = value.getListeners().poll(); + if (runnableToExecute == null) { + break; } - if (runnableToExecute != null) { - runnableToExecute.run(); - } else { - return; - } + runnableToExecute.run(); } + + value.getLatch().release(message.intValue()); } } diff --git a/redisson/src/test/java/org/redisson/transaction/RedissonBaseTransactionalMapTest.java b/redisson/src/test/java/org/redisson/transaction/RedissonBaseTransactionalMapTest.java index 6e9d521aa..27c7c4240 100644 --- a/redisson/src/test/java/org/redisson/transaction/RedissonBaseTransactionalMapTest.java +++ b/redisson/src/test/java/org/redisson/transaction/RedissonBaseTransactionalMapTest.java @@ -1,9 +1,12 @@ package org.redisson.transaction; -import static org.assertj.core.api.Assertions.*; +import static org.assertj.core.api.Assertions.assertThat; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.junit.Test; import org.redisson.BaseTest; @@ -17,6 +20,25 @@ public abstract class RedissonBaseTransactionalMapTest extends BaseTest { protected abstract RMap getTransactionalMap(RTransaction transaction); + @Test + public void testFastPut() throws InterruptedException { + ExecutorService executor = Executors.newFixedThreadPool(16); + for (int i = 0; i < 500; i++) { + executor.submit(() -> { + for (int j = 0; j < 100; j++) { + RTransaction t = redisson.createTransaction(TransactionOptions.defaults()); + RMap map = getTransactionalMap(t); + map.fastPut("1", "1"); + t.commit(); + } + }); + } + + executor.shutdown(); + assertThat(executor.awaitTermination(1, TimeUnit.MINUTES)).isTrue(); + } + + @Test public void testPutAll() { RMap m = getMap();