|
|
|
@ -197,7 +197,7 @@ public class PublishSubscribeService {
|
|
|
|
|
"Unable to acquire subscription lock after " + timeout + "ms. " +
|
|
|
|
|
"Try to increase 'timeout', 'subscriptionsPerConnection', 'subscriptionConnectionPoolSize' parameters."));
|
|
|
|
|
}, timeout, TimeUnit.MILLISECONDS);
|
|
|
|
|
lock.acquire(() -> {
|
|
|
|
|
lock.acquire().thenAccept(r -> {
|
|
|
|
|
if (!lockTimeout.cancel() || promise.isDone()) {
|
|
|
|
|
lock.release();
|
|
|
|
|
return;
|
|
|
|
@ -257,7 +257,7 @@ public class PublishSubscribeService {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
freePubSubLock.acquire(() -> {
|
|
|
|
|
freePubSubLock.acquire().thenAccept(c -> {
|
|
|
|
|
if (promise.isDone()) {
|
|
|
|
|
lock.release();
|
|
|
|
|
freePubSubLock.release();
|
|
|
|
@ -390,7 +390,7 @@ public class PublishSubscribeService {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
freePubSubLock.acquire(() -> {
|
|
|
|
|
freePubSubLock.acquire().thenAccept(c -> {
|
|
|
|
|
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, connectionManager);
|
|
|
|
|
int remainFreeAmount = entry.tryAcquire();
|
|
|
|
|
|
|
|
|
@ -553,7 +553,7 @@ public class PublishSubscribeService {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
freePubSubLock.acquire(() -> {
|
|
|
|
|
freePubSubLock.acquire().thenAccept(r -> {
|
|
|
|
|
e.getValue().getEntries().remove(entry);
|
|
|
|
|
freePubSubLock.release();
|
|
|
|
|
});
|
|
|
|
|