From 904c3ac97331b697289fda99177d193eedbb2ff0 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 19 Aug 2021 10:48:29 +0300 Subject: [PATCH] Fixed - threads waiting for RSemaphore permits acquisition unable to acquire them if permits added. #3225 --- .../java/org/redisson/RedissonSemaphore.java | 5 ++-- .../org/redisson/RedissonSemaphoreTest.java | 28 +++++++++++++++++++ 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonSemaphore.java b/redisson/src/main/java/org/redisson/RedissonSemaphore.java index 96c47542a..bef049171 100644 --- a/redisson/src/main/java/org/redisson/RedissonSemaphore.java +++ b/redisson/src/main/java/org/redisson/RedissonSemaphore.java @@ -497,8 +497,9 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { "if (value == false) then " + "value = 0;" + "end;" - + "redis.call('set', KEYS[1], value + ARGV[1]); ", - Arrays.asList(getRawName(), getChannelName()), permits); + + "redis.call('set', KEYS[1], value + ARGV[1]); " + + "redis.call('publish', KEYS[2], value + ARGV[1]); ", + Arrays.asList(getRawName(), getChannelName()), permits); } diff --git a/redisson/src/test/java/org/redisson/RedissonSemaphoreTest.java b/redisson/src/test/java/org/redisson/RedissonSemaphoreTest.java index 8212c0357..b52905693 100644 --- a/redisson/src/test/java/org/redisson/RedissonSemaphoreTest.java +++ b/redisson/src/test/java/org/redisson/RedissonSemaphoreTest.java @@ -3,6 +3,8 @@ package org.redisson; import static org.assertj.core.api.Assertions.assertThat; import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -13,6 +15,32 @@ import org.redisson.api.RSemaphore; public class RedissonSemaphoreTest extends BaseConcurrentTest { + @Test + public void testAcquireAfterAddPermits() throws InterruptedException { + RSemaphore s = redisson.getSemaphore("test"); + + CountDownLatch l = new CountDownLatch(1); + Thread t1 = new Thread(() -> { + s.addPermits(1); + try { + s.acquire(2); + l.countDown(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + + t1.start(); + t1.join(1000); + assertThat(l.await(1, TimeUnit.SECONDS)).isFalse(); + s.acquire(); + Thread.sleep(1000); + s.release(); + assertThat(l.await(1, TimeUnit.SECONDS)).isFalse(); + s.addPermits(1); + assertThat(l.await(1, TimeUnit.SECONDS)).isTrue(); + } + @Test public void testZero() throws InterruptedException { RSemaphore s = redisson.getSemaphore("test");