refactoring

pull/4308/head
Nikita Koksharov 3 years ago
parent d706b576eb
commit 43e7ef2446

@ -28,10 +28,7 @@ import org.redisson.connection.ConnectionManager;
import java.util.EventListener; import java.util.EventListener;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -159,12 +156,26 @@ public class PubSubConnectionEntry {
return subscribedChannelsAmount.incrementAndGet(); return subscribedChannelsAmount.incrementAndGet();
} }
public ChannelFuture subscribe(Codec codec, ChannelName channelName) { public void subscribe(Codec codec, PubSubType type, ChannelName channelName, CompletableFuture<Void> subscribeFuture) {
return conn.subscribe(codec, channelName); ChannelFuture future;
} if (PubSubType.PSUBSCRIBE == type) {
future = conn.psubscribe(codec, channelName);
} else {
future = conn.subscribe(codec, channelName);
}
public ChannelFuture psubscribe(Codec codec, ChannelName pattern) { future.addListener((ChannelFutureListener) future1 -> {
return conn.psubscribe(codec, pattern); if (!future1.isSuccess()) {
subscribeFuture.completeExceptionally(future1.cause());
return;
}
connectionManager.newTimeout(t -> {
subscribeFuture.completeExceptionally(new RedisTimeoutException(
"Subscription timeout after " + connectionManager.getConfig().getTimeout() + "ms. " +
"Check network and/or increase 'timeout' parameter."));
}, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);
});
} }
public SubscribeListener getSubscribeFuture(ChannelName channel, PubSubType type) { public SubscribeListener getSubscribeFuture(ChannelName channel, PubSubType type) {

@ -191,7 +191,7 @@ public class PublishSubscribeService {
Timeout lockTimeout = connectionManager.newTimeout(t -> { Timeout lockTimeout = connectionManager.newTimeout(t -> {
promise.completeExceptionally(new RedisTimeoutException( promise.completeExceptionally(new RedisTimeoutException(
"Unable to acquire subscription lock after " + timeout + "ms. " + "Unable to acquire subscription lock after " + timeout + "ms. " +
"Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters.")); "Try to increase 'timeout', 'subscriptionsPerConnection', 'subscriptionConnectionPoolSize' parameters."));
}, timeout, TimeUnit.MILLISECONDS); }, timeout, TimeUnit.MILLISECONDS);
lock.acquire(() -> { lock.acquire(() -> {
if (!lockTimeout.cancel() || promise.isDone()) { if (!lockTimeout.cancel() || promise.isDone()) {
@ -239,7 +239,7 @@ public class PublishSubscribeService {
Timeout task = connectionManager.newTimeout(t -> { Timeout task = connectionManager.newTimeout(t -> {
promise.completeExceptionally(new RedisTimeoutException( promise.completeExceptionally(new RedisTimeoutException(
"Unable to acquire subscription lock after " + timeout + "ms. " + "Unable to acquire subscription lock after " + timeout + "ms. " +
"Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters.")); "Try to increase 'timeout', 'subscriptionsPerConnection', 'subscriptionConnectionPoolSize' parameters."));
}, timeout, TimeUnit.MILLISECONDS); }, timeout, TimeUnit.MILLISECONDS);
promise.whenComplete((r, e) -> { promise.whenComplete((r, e) -> {
task.cancel(); task.cancel();
@ -307,29 +307,11 @@ public class PublishSubscribeService {
freePubSubLock.release(); freePubSubLock.release();
CompletableFuture<Void> subscribeFuture = addListeners(channelName, promise, type, lock, freeEntry, listeners); CompletableFuture<Void> subscribeFuture = addListeners(channelName, promise, type, lock, freeEntry, listeners);
freeEntry.subscribe(codec, type, channelName, subscribeFuture);
ChannelFuture future; subscribeFuture.whenComplete((r, e) -> {
if (PubSubType.PSUBSCRIBE == type) { if (e instanceof RedisTimeoutException) {
future = freeEntry.psubscribe(codec, channelName); unsubscribe(channelName, type);
} else {
future = freeEntry.subscribe(codec, channelName);
}
future.addListener((ChannelFutureListener) future1 -> {
if (!future1.isSuccess()) {
if (!promise.isDone()) {
subscribeFuture.cancel(false);
}
return;
} }
connectionManager.newTimeout(t -> {
if (subscribeFuture.completeExceptionally(new RedisTimeoutException(
"Subscription timeout after " + config.getTimeout() + "ms. " +
"Check network and/or increase 'timeout' parameter."))) {
unsubscribe(channelName, type);
}
}, config.getTimeout(), TimeUnit.MILLISECONDS);
}); });
}); });
} }
@ -426,27 +408,8 @@ public class PublishSubscribeService {
} }
freePubSubLock.release(); freePubSubLock.release();
addListeners(channelName, promise, type, lock, entry, listeners); CompletableFuture<Void> subscribeFuture = addListeners(channelName, promise, type, lock, entry, listeners);
entry.subscribe(codec, type, channelName, subscribeFuture);
ChannelFuture future;
if (PubSubType.PSUBSCRIBE == type) {
future = entry.psubscribe(codec, channelName);
} else {
future = entry.subscribe(codec, channelName);
}
future.addListener((ChannelFutureListener) future1 -> {
if (!future1.isSuccess()) {
if (!promise.isDone()) {
promise.cancel(false);
}
return;
}
connectionManager.newTimeout(timeout ->
promise.cancel(false),
config.getTimeout(), TimeUnit.MILLISECONDS);
});
}); });
}); });
return connFuture; return connFuture;

Loading…
Cancel
Save