RSemaphore, RPermitExpirableSemaphore miss release messages and wait a defined timeout before a next attempt or hang #5762

RLock, RFencedLock, RReadWriteLock miss unlock messages and wait a defined timeout before a next attempt or hang #5761
pull/5708/head^2
Nikita Koksharov 11 months ago
parent c06826fe40
commit 2b6f2fda79

@ -56,14 +56,31 @@ public class RedissonLockEntry implements PubSubEntry<RedissonLockEntry> {
public void addListener(Runnable listener) {
listeners.add(listener);
if (latch.tryAcquire()) {
tryRunListener();
}
}
public boolean removeListener(Runnable listener) {
return listeners.remove(listener);
public void tryRunListener() {
Runnable runnableToExecute = listeners.poll();
if (runnableToExecute != null) {
runnableToExecute.run();
}
}
public ConcurrentLinkedQueue<Runnable> getListeners() {
return listeners;
public void tryRunAllListeners() {
while (true) {
Runnable runnableToExecute = listeners.poll();
if (runnableToExecute == null) {
break;
}
runnableToExecute.run();
}
}
public boolean removeListener(Runnable listener) {
return listeners.remove(listener);
}
public Semaphore getLatch() {

@ -41,20 +41,11 @@ public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {
@Override
protected void onMessage(RedissonLockEntry value, Long message) {
if (message.equals(UNLOCK_MESSAGE)) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute != null) {
runnableToExecute.run();
}
value.tryRunListener();
value.getLatch().release();
} else if (message.equals(READ_UNLOCK_MESSAGE)) {
while (true) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute == null) {
break;
}
runnableToExecute.run();
}
value.tryRunAllListeners();
value.getLatch().release(value.getLatch().getQueueLength());
}

@ -37,10 +37,7 @@ public class SemaphorePubSub extends PublishSubscribe<RedissonLockEntry> {
@Override
protected void onMessage(RedissonLockEntry value, Long message) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute != null) {
runnableToExecute.run();
}
value.tryRunListener();
value.getLatch().release(Math.min(value.acquired(), message.intValue()));
}

Loading…
Cancel
Save