Fixed - RSemaphore permits can't be acquired due to "Maximum permit count exceeded" error #3573

pull/3588/merge
Nikita Koksharov 4 years ago
parent 383237229a
commit 7c3a56f877

@ -86,7 +86,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
return;
}
future.getNow().getLatch().acquire(permits);
future.getNow().getLatch().acquire();
}
} finally {
unsubscribe(future);
@ -230,7 +230,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
}
RedissonLockEntry entry = subscribeFuture.getNow();
if (entry.getLatch().tryAcquire(permits)) {
if (entry.getLatch().tryAcquire()) {
acquireAsync(permits, subscribeFuture, result);
} else {
entry.addListener(() -> {
@ -319,7 +319,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
// waiting for message
current = System.currentTimeMillis();
future.getNow().getLatch().tryAcquire(permits, time, TimeUnit.MILLISECONDS);
future.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
time -= System.currentTimeMillis() - current;
if (time <= 0) {

@ -40,8 +40,8 @@ public class SemaphorePubSub extends PublishSubscribe<RedissonLockEntry> {
if (runnableToExecute != null) {
runnableToExecute.run();
}
value.getLatch().release(message.intValue());
value.getLatch().release(Math.min(value.getLatch().getQueueLength(), message.intValue()));
}
}

@ -3,6 +3,7 @@ package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.time.Duration;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -224,6 +225,39 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest {
assertThat(lockedCounter.get()).isEqualTo(iterations);
}
@Test
public void testConcurrencyLoopMax_MultiInstance() throws InterruptedException {
final int iterations = 10;
final AtomicInteger lockedCounter = new AtomicInteger();
RSemaphore s = redisson.getSemaphore("test");
s.trySetPermits(Integer.MAX_VALUE);
testMultiInstanceConcurrency(4, r -> {
for (int i = 0; i < iterations; i++) {
int v = Integer.MAX_VALUE;
if (ThreadLocalRandom.current().nextBoolean()) {
v = 1;
}
try {
r.getSemaphore("test").acquire(v);
}catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
lockedCounter.incrementAndGet();
r.getSemaphore("test").release(v);
}
});
assertThat(lockedCounter.get()).isEqualTo(4 * iterations);
}
@Test
public void testConcurrencyLoop_MultiInstance() throws InterruptedException {
final int iterations = 100;

Loading…
Cancel
Save