RLock, RSemaphore, RCountDownLatch can blocks forever under some conditions. #543

pull/555/head
Nikita 9 years ago
parent 1aaa941271
commit 1190cc5246

@ -346,9 +346,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override
public void operationComplete(Future<Void> future) throws Exception {
promise.setSuccess(сonnEntry);
lock.release();
}
});
lock.release();
return;
}
@ -374,9 +374,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override
public void operationComplete(Future<Void> future) throws Exception {
promise.setSuccess(oldEntry);
lock.release();
}
});
lock.release();
return;
}
@ -433,9 +433,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override
public void operationComplete(Future<Void> future) throws Exception {
promise.setSuccess(oldEntry);
lock.release();
}
});
lock.release();
return;
}
@ -469,7 +469,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
Codec entryCodec = entry.getConnection().getChannels().get(channelName);
freePubSubLock.acquireUninterruptibly();
entry.unsubscribe(channelName, new BaseRedisPubSubListener() {
@Override
@ -479,7 +478,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
if (entry.release() == 1) {
freePubSubConnections.add(entry);
}
freePubSubLock.release();
lock.release();
return true;
@ -513,7 +511,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
Codec entryCodec = entry.getConnection().getPatternChannels().get(channelName);
freePubSubLock.acquireUninterruptibly();
entry.punsubscribe(channelName, new BaseRedisPubSubListener() {
@Override
@ -523,7 +520,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
if (entry.release() == 1) {
freePubSubConnections.add(entry);
}
freePubSubLock.release();
lock.release();
return true;

@ -39,11 +39,13 @@ abstract class PublishSubscribe<E extends PubSubEntry<E>> {
if (entry.release() == 0) {
// just an assertion
boolean removed = entries.remove(entryName) == entry;
if (removed) {
connectionManager.unsubscribe(channelName, semaphore);
if (!removed) {
throw new IllegalStateException();
}
connectionManager.unsubscribe(channelName, semaphore);
} else {
semaphore.release();
}
semaphore.release();
}
public E getEntry(String entryName) {

Loading…
Cancel
Save