refactoring

pull/4944/head
Nikita Koksharov 2 years ago
parent b8f88dc669
commit df7fa0c51b

@ -594,7 +594,11 @@ public class MasterSlaveEntry {
return slaveBalancer.getConnection(command, client);
}
public CompletableFuture<RedisPubSubConnection> nextPubSubConnection() {
public CompletableFuture<RedisPubSubConnection> nextPubSubConnection(ClientConnectionsEntry entry) {
if (entry != null) {
return slaveBalancer.nextPubSubConnection(entry);
}
if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
return pubSubConnectionPool.get();
}

@ -253,6 +253,10 @@ public class LoadBalancerManager {
return pubSubConnectionPool.get();
}
public CompletableFuture<RedisPubSubConnection> nextPubSubConnection(ClientConnectionsEntry entry) {
return pubSubConnectionPool.get(entry);
}
public boolean contains(InetSocketAddress addr) {
return getEntry(addr) != null;
}

@ -42,6 +42,10 @@ public class PubSubConnectionPool extends ConnectionPool<RedisPubSubConnection>
return get(RedisCommands.SUBSCRIBE);
}
public CompletableFuture<RedisPubSubConnection> get(ClientConnectionsEntry entry) {
return get(RedisCommands.SUBSCRIBE, entry);
}
@Override
protected RedisPubSubConnection poll(ClientConnectionsEntry entry, RedisCommand<?> command) {
return entry.pollSubscribeConnection();

@ -21,6 +21,7 @@ import org.redisson.client.*;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.AsyncSemaphore;
@ -161,7 +162,7 @@ public class PublishSubscribeService {
List<CompletableFuture<PubSubConnectionEntry>> futures = new ArrayList<>();
for (MasterSlaveEntry entry : entrySet) {
CompletableFuture<PubSubConnectionEntry> future = subscribe(PubSubType.PSUBSCRIBE, codec, channelName, entry, ls);
CompletableFuture<PubSubConnectionEntry> future = subscribe(PubSubType.PSUBSCRIBE, codec, channelName, entry, null, ls);
futures.add(future);
}
CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
@ -178,7 +179,7 @@ public class PublishSubscribeService {
return promise;
}
CompletableFuture<PubSubConnectionEntry> f = subscribe(PubSubType.PSUBSCRIBE, codec, channelName, entry, listeners);
CompletableFuture<PubSubConnectionEntry> f = subscribe(PubSubType.PSUBSCRIBE, codec, channelName, entry, null, listeners);
return f.thenApply(res -> Collections.singletonList(res));
}
@ -188,6 +189,11 @@ public class PublishSubscribeService {
|| channelName.toString().startsWith("__keyevent@"));
}
public CompletableFuture<PubSubConnectionEntry> subscribe(MasterSlaveEntry entry, ClientConnectionsEntry clientEntry,
Codec codec, ChannelName channelName, RedisPubSubListener<?>... listeners) {
return subscribe(PubSubType.SUBSCRIBE, codec, channelName, entry, clientEntry, listeners);
}
public CompletableFuture<PubSubConnectionEntry> subscribe(Codec codec, ChannelName channelName, RedisPubSubListener<?>... listeners) {
MasterSlaveEntry entry = getEntry(channelName);
if (entry == null) {
@ -196,7 +202,7 @@ public class PublishSubscribeService {
promise.completeExceptionally(ex);
return promise;
}
return subscribe(PubSubType.SUBSCRIBE, codec, channelName, entry, listeners);
return subscribe(PubSubType.SUBSCRIBE, codec, channelName, entry, null, listeners);
}
public CompletableFuture<PubSubConnectionEntry> ssubscribe(Codec codec, ChannelName channelName, RedisPubSubListener<?>... listeners) {
@ -207,11 +213,11 @@ public class PublishSubscribeService {
promise.completeExceptionally(ex);
return promise;
}
return subscribe(PubSubType.SSUBSCRIBE, codec, channelName, entry, listeners);
return subscribe(PubSubType.SSUBSCRIBE, codec, channelName, entry, null, listeners);
}
private CompletableFuture<PubSubConnectionEntry> subscribe(PubSubType type, Codec codec, ChannelName channelName,
MasterSlaveEntry entry, RedisPubSubListener<?>... listeners) {
MasterSlaveEntry entry, ClientConnectionsEntry clientEntry, RedisPubSubListener<?>... listeners) {
CompletableFuture<PubSubConnectionEntry> promise = new CompletableFuture<>();
AsyncSemaphore lock = getSemaphore(channelName);
int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts();
@ -226,7 +232,7 @@ public class PublishSubscribeService {
return;
}
subscribeNoTimeout(codec, channelName, entry, promise, type, lock, new AtomicInteger(), listeners);
subscribeNoTimeout(codec, channelName, entry, clientEntry, promise, type, lock, new AtomicInteger(), listeners);
timeout(promise);
});
return promise;
@ -242,7 +248,7 @@ public class PublishSubscribeService {
return promise;
}
subscribeNoTimeout(codec, new ChannelName(channelName), entry, promise,
subscribeNoTimeout(codec, new ChannelName(channelName), entry, null, promise,
PubSubType.SUBSCRIBE, semaphore, new AtomicInteger(), listeners);
return promise;
}
@ -300,12 +306,12 @@ public class PublishSubscribeService {
return;
}
subscribeNoTimeout(codec, channelName, entry, promise, type, lock, attempts, listeners);
subscribeNoTimeout(codec, channelName, entry, null, promise, type, lock, attempts, listeners);
}
private void subscribeNoTimeout(Codec codec, ChannelName channelName, MasterSlaveEntry entry,
CompletableFuture<PubSubConnectionEntry> promise, PubSubType type,
AsyncSemaphore lock, AtomicInteger attempts, RedisPubSubListener<?>... listeners) {
ClientConnectionsEntry clientEntry, CompletableFuture<PubSubConnectionEntry> promise,
PubSubType type, AsyncSemaphore lock, AtomicInteger attempts, RedisPubSubListener<?>... listeners) {
PubSubConnectionEntry connEntry = name2PubSubConnection.get(new PubSubKey(channelName, entry));
if (connEntry != null) {
addListeners(channelName, promise, type, lock, connEntry, listeners);
@ -325,7 +331,8 @@ public class PublishSubscribeService {
if (freeEntry == null) {
freePubSubLock.release();
CompletableFuture<RedisPubSubConnection> connectFuture = connect(codec, channelName, entry, promise, type, lock, listeners);
CompletableFuture<RedisPubSubConnection> connectFuture = connect(codec, channelName, entry,
clientEntry, promise, type, lock, listeners);
connectionManager.getServiceManager().newTimeout(t -> {
if (!connectFuture.cancel(false)
&& !connectFuture.isCompletedExceptionally()) {
@ -411,10 +418,11 @@ public class PublishSubscribeService {
}
private CompletableFuture<RedisPubSubConnection> connect(Codec codec, ChannelName channelName,
MasterSlaveEntry msEntry, CompletableFuture<PubSubConnectionEntry> promise,
MasterSlaveEntry msEntry, ClientConnectionsEntry clientEntry,
CompletableFuture<PubSubConnectionEntry> promise,
PubSubType type, AsyncSemaphore lock, RedisPubSubListener<?>... listeners) {
CompletableFuture<RedisPubSubConnection> connFuture = msEntry.nextPubSubConnection();
CompletableFuture<RedisPubSubConnection> connFuture = msEntry.nextPubSubConnection(clientEntry);
promise.whenComplete((res, e) -> {
if (e != null) {
connFuture.completeExceptionally(e);
@ -703,7 +711,7 @@ public class PublishSubscribeService {
}
CompletableFuture<PubSubConnectionEntry> subscribeFuture =
subscribe(PubSubType.PSUBSCRIBE, subscribeCodec, channelName, entry, listeners.toArray(new RedisPubSubListener[0]));
subscribe(PubSubType.PSUBSCRIBE, subscribeCodec, channelName, entry, null, listeners.toArray(new RedisPubSubListener[0]));
subscribeFuture.whenComplete((res, e) -> {
if (e != null) {
connectionManager.getServiceManager().newTimeout(task -> {

Loading…
Cancel
Save