|
|
@ -62,7 +62,7 @@ public class PublishSubscribeService {
|
|
|
|
|
|
|
|
|
|
|
|
private final ConcurrentMap<ChannelName, PubSubConnectionEntry> name2PubSubConnection = new ConcurrentHashMap<>();
|
|
|
|
private final ConcurrentMap<ChannelName, PubSubConnectionEntry> name2PubSubConnection = new ConcurrentHashMap<>();
|
|
|
|
|
|
|
|
|
|
|
|
private final Queue<PubSubConnectionEntry> freePubSubConnections = new ConcurrentLinkedQueue<PubSubConnectionEntry>();
|
|
|
|
private final Queue<PubSubConnectionEntry> freePubSubConnections = new ConcurrentLinkedQueue<>();
|
|
|
|
|
|
|
|
|
|
|
|
private final SemaphorePubSub semaphorePubSub = new SemaphorePubSub(this);
|
|
|
|
private final SemaphorePubSub semaphorePubSub = new SemaphorePubSub(this);
|
|
|
|
|
|
|
|
|
|
|
@ -109,9 +109,9 @@ public class PublishSubscribeService {
|
|
|
|
return subscribe(PubSubType.SUBSCRIBE, codec, channelName, new RedissonPromise<PubSubConnectionEntry>(), listeners);
|
|
|
|
return subscribe(PubSubType.SUBSCRIBE, codec, channelName, new RedissonPromise<PubSubConnectionEntry>(), listeners);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private RFuture<PubSubConnectionEntry> subscribe(final PubSubType type, final Codec codec, final ChannelName channelName,
|
|
|
|
private RFuture<PubSubConnectionEntry> subscribe(PubSubType type, Codec codec, ChannelName channelName,
|
|
|
|
final RPromise<PubSubConnectionEntry> promise, final RedisPubSubListener<?>... listeners) {
|
|
|
|
RPromise<PubSubConnectionEntry> promise, RedisPubSubListener<?>... listeners) {
|
|
|
|
final AsyncSemaphore lock = getSemaphore(channelName);
|
|
|
|
AsyncSemaphore lock = getSemaphore(channelName);
|
|
|
|
lock.acquire(new Runnable() {
|
|
|
|
lock.acquire(new Runnable() {
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public void run() {
|
|
|
|
public void run() {
|
|
|
@ -120,7 +120,7 @@ public class PublishSubscribeService {
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
final RPromise<PubSubConnectionEntry> result = new RedissonPromise<PubSubConnectionEntry>();
|
|
|
|
RPromise<PubSubConnectionEntry> result = new RedissonPromise<PubSubConnectionEntry>();
|
|
|
|
promise.onComplete((res, e) -> {
|
|
|
|
promise.onComplete((res, e) -> {
|
|
|
|
if (e != null) {
|
|
|
|
if (e != null) {
|
|
|
|
result.tryFailure(e);
|
|
|
|
result.tryFailure(e);
|
|
|
@ -150,11 +150,11 @@ public class PublishSubscribeService {
|
|
|
|
return locks[Math.abs(channelName.hashCode() % locks.length)];
|
|
|
|
return locks[Math.abs(channelName.hashCode() % locks.length)];
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private void subscribe(final Codec codec, final ChannelName channelName,
|
|
|
|
private void subscribe(Codec codec, ChannelName channelName,
|
|
|
|
final RPromise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock, final RedisPubSubListener<?>... listeners) {
|
|
|
|
RPromise<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener<?>... listeners) {
|
|
|
|
final PubSubConnectionEntry connEntry = name2PubSubConnection.get(channelName);
|
|
|
|
PubSubConnectionEntry connEntry = name2PubSubConnection.get(channelName);
|
|
|
|
if (connEntry != null) {
|
|
|
|
if (connEntry != null) {
|
|
|
|
subscribe(channelName, promise, type, lock, connEntry, listeners);
|
|
|
|
addListeners(channelName, promise, type, lock, connEntry, listeners);
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -168,7 +168,7 @@ public class PublishSubscribeService {
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
final PubSubConnectionEntry freeEntry = freePubSubConnections.peek();
|
|
|
|
PubSubConnectionEntry freeEntry = freePubSubConnections.peek();
|
|
|
|
if (freeEntry == null) {
|
|
|
|
if (freeEntry == null) {
|
|
|
|
connect(codec, channelName, promise, type, lock, listeners);
|
|
|
|
connect(codec, channelName, promise, type, lock, listeners);
|
|
|
|
return;
|
|
|
|
return;
|
|
|
@ -179,12 +179,12 @@ public class PublishSubscribeService {
|
|
|
|
throw new IllegalStateException();
|
|
|
|
throw new IllegalStateException();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry);
|
|
|
|
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry);
|
|
|
|
if (oldEntry != null) {
|
|
|
|
if (oldEntry != null) {
|
|
|
|
freeEntry.release();
|
|
|
|
freeEntry.release();
|
|
|
|
freePubSubLock.release();
|
|
|
|
freePubSubLock.release();
|
|
|
|
|
|
|
|
|
|
|
|
subscribe(channelName, promise, type, lock, oldEntry, listeners);
|
|
|
|
addListeners(channelName, promise, type, lock, oldEntry, listeners);
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -193,7 +193,7 @@ public class PublishSubscribeService {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
freePubSubLock.release();
|
|
|
|
freePubSubLock.release();
|
|
|
|
|
|
|
|
|
|
|
|
subscribe(channelName, promise, type, lock, freeEntry, listeners);
|
|
|
|
addListeners(channelName, promise, type, lock, freeEntry, listeners);
|
|
|
|
|
|
|
|
|
|
|
|
if (PubSubType.PSUBSCRIBE == type) {
|
|
|
|
if (PubSubType.PSUBSCRIBE == type) {
|
|
|
|
freeEntry.psubscribe(codec, channelName);
|
|
|
|
freeEntry.psubscribe(codec, channelName);
|
|
|
@ -205,14 +205,14 @@ public class PublishSubscribeService {
|
|
|
|
});
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private void subscribe(final ChannelName channelName, final RPromise<PubSubConnectionEntry> promise,
|
|
|
|
private void addListeners(ChannelName channelName, RPromise<PubSubConnectionEntry> promise,
|
|
|
|
final PubSubType type, final AsyncSemaphore lock, final PubSubConnectionEntry connEntry,
|
|
|
|
PubSubType type, AsyncSemaphore lock, PubSubConnectionEntry connEntry,
|
|
|
|
final RedisPubSubListener<?>... listeners) {
|
|
|
|
RedisPubSubListener<?>... listeners) {
|
|
|
|
for (RedisPubSubListener<?> listener : listeners) {
|
|
|
|
for (RedisPubSubListener<?> listener : listeners) {
|
|
|
|
connEntry.addListener(channelName, listener);
|
|
|
|
connEntry.addListener(channelName, listener);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
SubscribeListener list = connEntry.getSubscribeFuture(channelName, type);
|
|
|
|
SubscribeListener list = connEntry.getSubscribeFuture(channelName, type);
|
|
|
|
final RFuture<Void> subscribeFuture = list.getSuccessFuture();
|
|
|
|
RFuture<Void> subscribeFuture = list.getSuccessFuture();
|
|
|
|
|
|
|
|
|
|
|
|
subscribeFuture.onComplete((res, e) -> {
|
|
|
|
subscribeFuture.onComplete((res, e) -> {
|
|
|
|
if (!promise.trySuccess(connEntry)) {
|
|
|
|
if (!promise.trySuccess(connEntry)) {
|
|
|
@ -283,14 +283,14 @@ public class PublishSubscribeService {
|
|
|
|
|
|
|
|
|
|
|
|
freePubSubLock.release();
|
|
|
|
freePubSubLock.release();
|
|
|
|
|
|
|
|
|
|
|
|
subscribe(channelName, promise, type, lock, oldEntry, listeners);
|
|
|
|
addListeners(channelName, promise, type, lock, oldEntry, listeners);
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
freePubSubConnections.add(entry);
|
|
|
|
freePubSubConnections.add(entry);
|
|
|
|
freePubSubLock.release();
|
|
|
|
freePubSubLock.release();
|
|
|
|
|
|
|
|
|
|
|
|
subscribe(channelName, promise, type, lock, entry, listeners);
|
|
|
|
addListeners(channelName, promise, type, lock, entry, listeners);
|
|
|
|
|
|
|
|
|
|
|
|
if (PubSubType.PSUBSCRIBE == type) {
|
|
|
|
if (PubSubType.PSUBSCRIBE == type) {
|
|
|
|
entry.psubscribe(codec, channelName);
|
|
|
|
entry.psubscribe(codec, channelName);
|
|
|
@ -300,14 +300,14 @@ public class PublishSubscribeService {
|
|
|
|
});
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
public RFuture<Void> unsubscribe(final ChannelName channelName, final AsyncSemaphore lock) {
|
|
|
|
public RFuture<Void> unsubscribe(ChannelName channelName, AsyncSemaphore lock) {
|
|
|
|
final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
|
|
|
|
PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
|
|
|
|
if (entry == null || connectionManager.isShuttingDown()) {
|
|
|
|
if (entry == null || connectionManager.isShuttingDown()) {
|
|
|
|
lock.release();
|
|
|
|
lock.release();
|
|
|
|
return RedissonPromise.newSucceededFuture(null);
|
|
|
|
return RedissonPromise.newSucceededFuture(null);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
final RedissonPromise<Void> result = new RedissonPromise<Void>();
|
|
|
|
RedissonPromise<Void> result = new RedissonPromise<Void>();
|
|
|
|
entry.unsubscribe(channelName, new BaseRedisPubSubListener() {
|
|
|
|
entry.unsubscribe(channelName, new BaseRedisPubSubListener() {
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
@ -329,17 +329,17 @@ public class PublishSubscribeService {
|
|
|
|
return result;
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
public RFuture<Codec> unsubscribe(final ChannelName channelName, final PubSubType topicType) {
|
|
|
|
public RFuture<Codec> unsubscribe(ChannelName channelName, PubSubType topicType) {
|
|
|
|
if (connectionManager.isShuttingDown()) {
|
|
|
|
if (connectionManager.isShuttingDown()) {
|
|
|
|
return RedissonPromise.newSucceededFuture(null);
|
|
|
|
return RedissonPromise.newSucceededFuture(null);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
final RPromise<Codec> result = new RedissonPromise<Codec>();
|
|
|
|
RPromise<Codec> result = new RedissonPromise<>();
|
|
|
|
final AsyncSemaphore lock = getSemaphore(channelName);
|
|
|
|
AsyncSemaphore lock = getSemaphore(channelName);
|
|
|
|
lock.acquire(new Runnable() {
|
|
|
|
lock.acquire(new Runnable() {
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public void run() {
|
|
|
|
public void run() {
|
|
|
|
final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
|
|
|
|
PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
|
|
|
|
if (entry == null) {
|
|
|
|
if (entry == null) {
|
|
|
|
lock.release();
|
|
|
|
lock.release();
|
|
|
|
result.trySuccess(null);
|
|
|
|
result.trySuccess(null);
|
|
|
@ -352,7 +352,7 @@ public class PublishSubscribeService {
|
|
|
|
freePubSubConnections.remove(entry);
|
|
|
|
freePubSubConnections.remove(entry);
|
|
|
|
freePubSubLock.release();
|
|
|
|
freePubSubLock.release();
|
|
|
|
|
|
|
|
|
|
|
|
final Codec entryCodec;
|
|
|
|
Codec entryCodec;
|
|
|
|
if (topicType == PubSubType.PUNSUBSCRIBE) {
|
|
|
|
if (topicType == PubSubType.PUNSUBSCRIBE) {
|
|
|
|
entryCodec = entry.getConnection().getPatternChannels().get(channelName);
|
|
|
|
entryCodec = entry.getConnection().getPatternChannels().get(channelName);
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
@ -385,8 +385,8 @@ public class PublishSubscribeService {
|
|
|
|
return result;
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
public void punsubscribe(final ChannelName channelName, final AsyncSemaphore lock) {
|
|
|
|
public void punsubscribe(ChannelName channelName, AsyncSemaphore lock) {
|
|
|
|
final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
|
|
|
|
PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
|
|
|
|
if (entry == null) {
|
|
|
|
if (entry == null) {
|
|
|
|
lock.release();
|
|
|
|
lock.release();
|
|
|
|
return;
|
|
|
|
return;
|
|
|
@ -444,8 +444,8 @@ public class PublishSubscribeService {
|
|
|
|
});
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private void subscribe(final ChannelName channelName, final Collection<RedisPubSubListener<?>> listeners,
|
|
|
|
private void subscribe(ChannelName channelName, Collection<RedisPubSubListener<?>> listeners,
|
|
|
|
final Codec subscribeCodec) {
|
|
|
|
Codec subscribeCodec) {
|
|
|
|
RFuture<PubSubConnectionEntry> subscribeFuture = subscribe(subscribeCodec, channelName, listeners.toArray(new RedisPubSubListener[listeners.size()]));
|
|
|
|
RFuture<PubSubConnectionEntry> subscribeFuture = subscribe(subscribeCodec, channelName, listeners.toArray(new RedisPubSubListener[listeners.size()]));
|
|
|
|
subscribeFuture.onComplete((res, e) -> {
|
|
|
|
subscribeFuture.onComplete((res, e) -> {
|
|
|
|
if (e != null) {
|
|
|
|
if (e != null) {
|
|
|
@ -457,8 +457,8 @@ public class PublishSubscribeService {
|
|
|
|
});
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private void psubscribe(final ChannelName channelName, final Collection<RedisPubSubListener<?>> listeners,
|
|
|
|
private void psubscribe(ChannelName channelName, Collection<RedisPubSubListener<?>> listeners,
|
|
|
|
final Codec subscribeCodec) {
|
|
|
|
Codec subscribeCodec) {
|
|
|
|
RFuture<PubSubConnectionEntry> subscribeFuture = psubscribe(channelName, subscribeCodec, listeners.toArray(new RedisPubSubListener[listeners.size()]));
|
|
|
|
RFuture<PubSubConnectionEntry> subscribeFuture = psubscribe(channelName, subscribeCodec, listeners.toArray(new RedisPubSubListener[listeners.size()]));
|
|
|
|
subscribeFuture.onComplete((res, e) -> {
|
|
|
|
subscribeFuture.onComplete((res, e) -> {
|
|
|
|
if (e != null) {
|
|
|
|
if (e != null) {
|
|
|
|