Fixed - threads waiting for RSemaphore permits acquisition unable to acquire them if permits added. #3225

pull/3794/head
Nikita Koksharov 4 years ago
parent f1bdf520c2
commit 904c3ac973

@ -497,8 +497,9 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
"if (value == false) then "
+ "value = 0;"
+ "end;"
+ "redis.call('set', KEYS[1], value + ARGV[1]); ",
Arrays.<Object>asList(getRawName(), getChannelName()), permits);
+ "redis.call('set', KEYS[1], value + ARGV[1]); "
+ "redis.call('publish', KEYS[2], value + ARGV[1]); ",
Arrays.asList(getRawName(), getChannelName()), permits);
}

@ -3,6 +3,8 @@ package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -13,6 +15,32 @@ import org.redisson.api.RSemaphore;
public class RedissonSemaphoreTest extends BaseConcurrentTest {
@Test
public void testAcquireAfterAddPermits() throws InterruptedException {
RSemaphore s = redisson.getSemaphore("test");
CountDownLatch l = new CountDownLatch(1);
Thread t1 = new Thread(() -> {
s.addPermits(1);
try {
s.acquire(2);
l.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t1.start();
t1.join(1000);
assertThat(l.await(1, TimeUnit.SECONDS)).isFalse();
s.acquire();
Thread.sleep(1000);
s.release();
assertThat(l.await(1, TimeUnit.SECONDS)).isFalse();
s.addPermits(1);
assertThat(l.await(1, TimeUnit.SECONDS)).isTrue();
}
@Test
public void testZero() throws InterruptedException {
RSemaphore s = redisson.getSemaphore("test");

Loading…
Cancel
Save