refactoring

pull/5564/head
Nikita Koksharov 1 year ago
parent bbc20bbd49
commit 4003686671

@ -17,14 +17,16 @@ package org.redisson.pubsub;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.redisson.misc.WrappedLock;
import org.redisson.PubSubMessageListener;
import org.redisson.PubSubPatternMessageListener;
import org.redisson.client.*;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.ServiceManager;
import org.redisson.misc.AsyncSemaphore;
import org.redisson.misc.WrappedLock;
import java.util.EventListener;
import java.util.LinkedList;
@ -54,11 +56,13 @@ public class PubSubConnectionEntry {
private static final Queue<RedisPubSubListener<?>> EMPTY_QUEUE = new LinkedList<>();
private final ServiceManager serviceManager;
private final PublishSubscribeService subscribeService;
public PubSubConnectionEntry(RedisPubSubConnection conn, ServiceManager serviceManager) {
public PubSubConnectionEntry(RedisPubSubConnection conn, ConnectionManager connectionManager) {
super();
this.conn = conn;
this.serviceManager = serviceManager;
this.serviceManager = connectionManager.getServiceManager();
this.subscribeService = connectionManager.getSubscribeService();
this.subscribedChannelsAmount = new AtomicInteger(serviceManager.getConfig().getSubscriptionsPerConnection());
}
@ -161,9 +165,31 @@ public class PubSubConnectionEntry {
return subscribedChannelsAmount.get() == serviceManager.getConfig().getSubscriptionsPerConnection();
}
public void subscribe(Codec codec, PubSubType type, ChannelName channelName, CompletableFuture<Void> subscribeFuture) {
ChannelFuture future;
public void subscribe(Codec codec, ChannelName channelName, CompletableFuture<PubSubConnectionEntry> pm,
PubSubType type, AsyncSemaphore lock, RedisPubSubListener<?>[] listeners) {
CompletableFuture<PubSubConnectionEntry> pp = new CompletableFuture<>();
pp.whenComplete((r, e) -> {
if (e != null) {
PubSubType unsubscribeType = PublishSubscribeService.SUBSCRIBE2UNSUBSCRIBE.get(type);
CompletableFuture<Codec> f = subscribeService.unsubscribe(channelName, unsubscribeType);
f.whenComplete((rr, ee) -> {
pm.completeExceptionally(e);
});
return;
}
pm.complete(r);
});
CompletableFuture<Void> subscribeFuture = addListeners(channelName, pp, type, lock, listeners);
CompletableFuture<Void> promise = new CompletableFuture<>();
promise.whenComplete((r, ex) -> {
if (ex != null) {
subscribeFuture.completeExceptionally(ex);
}
});
ChannelFuture future;
if (PubSubType.SUBSCRIBE == type) {
future = conn.subscribe(promise, codec, channelName);
} else if (PubSubType.SSUBSCRIBE == type) {
@ -171,13 +197,6 @@ public class PubSubConnectionEntry {
} else {
future = conn.psubscribe(promise, codec, channelName);
}
promise.whenComplete((r, ex) -> {
if (ex != null) {
subscribeFuture.completeExceptionally(ex);
}
});
future.addListener((ChannelFutureListener) future1 -> {
if (!future1.isSuccess()) {
subscribeFuture.completeExceptionally(future1.cause());
@ -192,7 +211,7 @@ public class PubSubConnectionEntry {
});
}
public SubscribeListener getSubscribeFuture(ChannelName channel, PubSubType type) {
private SubscribeListener getSubscribeFuture(ChannelName channel, PubSubType type) {
return subscribeChannelListeners.computeIfAbsent(channel, k -> {
SubscribeListener listener = new SubscribeListener(channel, type);
conn.addListener(listener);
@ -257,4 +276,41 @@ public class PubSubConnectionEntry {
public String toString() {
return "PubSubConnectionEntry [subscribedChannelsAmount=" + subscribedChannelsAmount + ", conn=" + conn + "]";
}
public CompletableFuture<Void> addListeners(ChannelName channelName,
CompletableFuture<PubSubConnectionEntry> promise,
PubSubType type, AsyncSemaphore lock,
RedisPubSubListener<?>... listeners) {
for (RedisPubSubListener<?> listener : listeners) {
addListener(channelName, listener);
}
SubscribeListener list = getSubscribeFuture(channelName, type);
CompletableFuture<Void> subscribeFuture = list.getSuccessFuture();
subscribeFuture.whenComplete((res, e) -> {
if (e != null) {
promise.completeExceptionally(e);
lock.release();
return;
}
if (!promise.complete(this)) {
for (RedisPubSubListener<?> listener : listeners) {
removeListener(channelName, listener);
}
if (!hasListeners(channelName)) {
subscribeService.unsubscribeLocked(type, channelName)
.whenComplete((r, ex) -> {
lock.release();
});
} else {
lock.release();
}
} else {
lock.release();
}
});
return subscribeFuture;
}
}

@ -115,7 +115,13 @@ public class PublishSubscribeService {
private boolean shardingSupported = false;
private final Map<PubSubType, PubSubType> subscribe2unsubscribe = new HashMap<>();
public static final Map<PubSubType, PubSubType> SUBSCRIBE2UNSUBSCRIBE = new HashMap<>();
static {
SUBSCRIBE2UNSUBSCRIBE.put(PubSubType.SUBSCRIBE, PubSubType.UNSUBSCRIBE);
SUBSCRIBE2UNSUBSCRIBE.put(PubSubType.SSUBSCRIBE, PubSubType.SUNSUBSCRIBE);
SUBSCRIBE2UNSUBSCRIBE.put(PubSubType.PSUBSCRIBE, PubSubType.PUNSUBSCRIBE);
}
public PublishSubscribeService(ConnectionManager connectionManager) {
super();
@ -124,10 +130,6 @@ public class PublishSubscribeService {
for (int i = 0; i < locks.length; i++) {
locks[i] = new AsyncSemaphore(1);
}
subscribe2unsubscribe.put(PubSubType.SUBSCRIBE, PubSubType.UNSUBSCRIBE);
subscribe2unsubscribe.put(PubSubType.SSUBSCRIBE, PubSubType.SUNSUBSCRIBE);
subscribe2unsubscribe.put(PubSubType.PSUBSCRIBE, PubSubType.PUNSUBSCRIBE);
}
public LockPubSub getLockPubSub() {
@ -370,7 +372,7 @@ public class PublishSubscribeService {
PubSubType type, AsyncSemaphore lock, AtomicInteger attempts, RedisPubSubListener<?>... listeners) {
PubSubConnectionEntry connEntry = name2PubSubConnection.get(new PubSubKey(channelName, entry));
if (connEntry != null) {
addListeners(channelName, promise, type, lock, connEntry, listeners);
connEntry.addListeners(channelName, promise, type, lock, listeners);
return;
}
@ -401,7 +403,7 @@ public class PublishSubscribeService {
freeEntry.release();
freePubSubLock.release();
addListeners(channelName, promise, type, lock, oldEntry, listeners);
oldEntry.addListeners(channelName, promise, type, lock, listeners);
return;
}
@ -413,21 +415,7 @@ public class PublishSubscribeService {
}
freePubSubLock.release();
CompletableFuture<PubSubConnectionEntry> pp = new CompletableFuture<>();
pp.whenComplete((r, e) -> {
if (e != null) {
PubSubType unsubscribeType = subscribe2unsubscribe.get(type);
CompletableFuture<Codec> f = unsubscribe(channelName, unsubscribeType);
f.whenComplete((rr, ee) -> {
promise.completeExceptionally(e);
});
return;
}
promise.complete(r);
});
CompletableFuture<Void> subscribeFuture = addListeners(channelName, pp, type, lock, freeEntry, listeners);
freeEntry.subscribe(codec, type, channelName, subscribeFuture);
freeEntry.subscribe(codec, channelName, promise, type, lock, listeners);
});
}
@ -436,41 +424,6 @@ public class PublishSubscribeService {
return connectionManager.getWriteEntry(slot);
}
private CompletableFuture<Void> addListeners(ChannelName channelName, CompletableFuture<PubSubConnectionEntry> promise,
PubSubType type, AsyncSemaphore lock, PubSubConnectionEntry connEntry,
RedisPubSubListener<?>... listeners) {
for (RedisPubSubListener<?> listener : listeners) {
connEntry.addListener(channelName, listener);
}
SubscribeListener list = connEntry.getSubscribeFuture(channelName, type);
CompletableFuture<Void> subscribeFuture = list.getSuccessFuture();
subscribeFuture.whenComplete((res, e) -> {
if (e != null) {
promise.completeExceptionally(e);
lock.release();
return;
}
if (!promise.complete(connEntry)) {
for (RedisPubSubListener<?> listener : listeners) {
connEntry.removeListener(channelName, listener);
}
if (!connEntry.hasListeners(channelName)) {
unsubscribeLocked(type, channelName)
.whenComplete((r, ex) -> {
lock.release();
});
} else {
lock.release();
}
} else {
lock.release();
}
});
return subscribeFuture;
}
private void connect(Codec codec, ChannelName channelName,
MasterSlaveEntry msEntry, ClientConnectionsEntry clientEntry,
CompletableFuture<PubSubConnectionEntry> promise,
@ -495,17 +448,17 @@ public class PublishSubscribeService {
connFuture.thenAccept(conn -> {
freePubSubLock.acquire().thenAccept(c -> {
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, connectionManager.getServiceManager());
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, connectionManager);
int remainFreeAmount = entry.tryAcquire();
PubSubKey key = new PubSubKey(channelName, msEntry);
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(key, entry);
if (oldEntry != null) {
msEntry.returnPubSubConnection(conn);
msEntry.returnPubSubConnection(entry.getConnection());
freePubSubLock.release();
addListeners(channelName, promise, type, lock, oldEntry, listeners);
oldEntry.addListeners(channelName, promise, type, lock, listeners);
return;
}
@ -518,21 +471,7 @@ public class PublishSubscribeService {
}
freePubSubLock.release();
CompletableFuture<PubSubConnectionEntry> pp = new CompletableFuture<>();
pp.whenComplete((r, e) -> {
if (e != null) {
PubSubType unsubscribeType = subscribe2unsubscribe.get(type);
CompletableFuture<Codec> f = unsubscribe(channelName, unsubscribeType);
f.whenComplete((rr, ee) -> {
promise.completeExceptionally(e);
});
return;
}
promise.complete(r);
});
CompletableFuture<Void> subscribeFuture = addListeners(channelName, pp, type, lock, entry, listeners);
entry.subscribe(codec, type, channelName, subscribeFuture);
entry.subscribe(codec, channelName, promise, type, lock, listeners);
});
});
}

Loading…
Cancel
Save