diff --git a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java index e1ff9757e..20f3b06ea 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java +++ b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java @@ -16,6 +16,7 @@ package org.redisson.pubsub; import java.util.Collection; +import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -47,32 +48,34 @@ import io.netty.util.Timeout; import io.netty.util.TimerTask; /** - * + * * @author Nikita Koksharov * */ public class PublishSubscribeService { private static final Logger log = LoggerFactory.getLogger(PublishSubscribeService.class); - + private final ConnectionManager connectionManager; - + private final MasterSlaveServersConfig config; - + private final AsyncSemaphore[] locks = new AsyncSemaphore[50]; - + private final AsyncSemaphore freePubSubLock = new AsyncSemaphore(1); - + private final ConcurrentMap name2PubSubConnection = new ConcurrentHashMap<>(); - - private final Queue freePubSubConnections = new ConcurrentLinkedQueue<>(); + + private final ConcurrentMap> freePubSubMap = new ConcurrentHashMap<>(); + + private final Queue emptyQueue = new LinkedList<>(); private final SemaphorePubSub semaphorePubSub = new SemaphorePubSub(this); - + private final CountDownLatchPubSub countDownLatchPubSub = new CountDownLatchPubSub(this); - + private final LockPubSub lockPubSub = new LockPubSub(this); - + public PublishSubscribeService(ConnectionManager connectionManager, MasterSlaveServersConfig config) { super(); this.connectionManager = connectionManager; @@ -81,15 +84,15 @@ public class PublishSubscribeService { locks[i] = new AsyncSemaphore(1); } } - + public LockPubSub getLockPubSub() { return lockPubSub; } - + public CountDownLatchPubSub getCountDownLatchPubSub() { return countDownLatchPubSub; } - + public SemaphorePubSub getSemaphorePubSub() { return semaphorePubSub; } @@ -101,7 +104,7 @@ public class PublishSubscribeService { public RFuture psubscribe(ChannelName channelName, Codec codec, RedisPubSubListener... listeners) { return subscribe(PubSubType.PSUBSCRIBE, codec, channelName, new RedissonPromise(), listeners); } - + public RFuture psubscribe(String channelName, Codec codec, AsyncSemaphore semaphore, RedisPubSubListener... listeners) { RPromise promise = new RedissonPromise(); subscribe(codec, new ChannelName(channelName), promise, PubSubType.PSUBSCRIBE, semaphore, listeners); @@ -122,7 +125,7 @@ public class PublishSubscribeService { lock.release(); return; } - + RPromise result = new RedissonPromise(); promise.onComplete((res, e) -> { if (e != null) { @@ -134,7 +137,7 @@ public class PublishSubscribeService { promise.tryFailure(e); return; } - + promise.trySuccess(res); }); subscribe(codec, channelName, result, type, lock, listeners); @@ -142,7 +145,7 @@ public class PublishSubscribeService { }); return promise; } - + public RFuture subscribe(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener... listeners) { RPromise promise = new RedissonPromise(); subscribe(codec, new ChannelName(channelName), promise, PubSubType.SUBSCRIBE, semaphore, listeners); @@ -152,8 +155,8 @@ public class PublishSubscribeService { public AsyncSemaphore getSemaphore(ChannelName channelName) { return locks[Math.abs(channelName.hashCode() % locks.length)]; } - - private void subscribe(Codec codec, ChannelName channelName, + + private void subscribe(Codec codec, ChannelName channelName, RPromise promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener... listeners) { PubSubConnectionEntry connEntry = name2PubSubConnection.get(channelName); if (connEntry != null) { @@ -170,41 +173,43 @@ public class PublishSubscribeService { freePubSubLock.release(); return; } - + + Queue freePubSubConnections = getConnectionsQueue(channelName); + PubSubConnectionEntry freeEntry = freePubSubConnections.peek(); if (freeEntry == null) { connect(codec, channelName, promise, type, lock, listeners); return; } - + int remainFreeAmount = freeEntry.tryAcquire(); if (remainFreeAmount == -1) { throw new IllegalStateException(); } - + PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry); if (oldEntry != null) { freeEntry.release(); freePubSubLock.release(); - + addListeners(channelName, promise, type, lock, oldEntry, listeners); return; } - + if (remainFreeAmount == 0) { freePubSubConnections.poll(); } freePubSubLock.release(); - + RFuture subscribeFuture = addListeners(channelName, promise, type, lock, freeEntry, listeners); - + ChannelFuture future; if (PubSubType.PSUBSCRIBE == type) { future = freeEntry.psubscribe(codec, channelName); } else { future = freeEntry.subscribe(codec, channelName); } - + future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { @@ -214,7 +219,7 @@ public class PublishSubscribeService { } return; } - + connectionManager.newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { @@ -224,10 +229,16 @@ public class PublishSubscribeService { } }); } - + }); } + private Queue getConnectionsQueue(ChannelName channelName) { + int slot = connectionManager.calcSlot(channelName.getName()); + MasterSlaveEntry entry = connectionManager.getEntry(slot); + return freePubSubMap.getOrDefault(entry, emptyQueue); + } + private RFuture addListeners(ChannelName channelName, RPromise promise, PubSubType type, AsyncSemaphore lock, PubSubConnectionEntry connEntry, RedisPubSubListener... listeners) { @@ -236,7 +247,7 @@ public class PublishSubscribeService { } SubscribeListener list = connEntry.getSubscribeFuture(channelName, type); RFuture subscribeFuture = list.getSuccessFuture(); - + subscribeFuture.onComplete((res, e) -> { if (!promise.trySuccess(connEntry)) { for (RedisPubSubListener listener : listeners) { @@ -263,7 +274,7 @@ public class PublishSubscribeService { entry.returnPubSubConnection(pubSubEntry); } } - + private RFuture nextPubSubConnection(int slot) { MasterSlaveEntry entry = connectionManager.getEntry(slot); if (entry == null) { @@ -272,7 +283,7 @@ public class PublishSubscribeService { } return entry.nextPubSubConnection(); } - + private void connect(Codec codec, ChannelName channelName, RPromise promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener... listeners) { int slot = connectionManager.calcSlot(channelName.getName()); @@ -282,41 +293,41 @@ public class PublishSubscribeService { ((RPromise) connFuture).tryFailure(e); } }); - connFuture.onComplete((conn, e) -> { - if (e != null) { + connFuture.onComplete((conn, ex) -> { + if (ex != null) { freePubSubLock.release(); lock.release(); - promise.tryFailure(e); + promise.tryFailure(ex); return; } - + PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection()); int remainFreeAmount = entry.tryAcquire(); - + PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); if (oldEntry != null) { releaseSubscribeConnection(slot, entry); - + freePubSubLock.release(); addListeners(channelName, promise, type, lock, oldEntry, listeners); return; } - + if (remainFreeAmount > 0) { - freePubSubConnections.add(entry); + addFreeConnectionEntry(channelName, entry); } freePubSubLock.release(); - + RFuture subscribeFuture = addListeners(channelName, promise, type, lock, entry, listeners); - + ChannelFuture future; if (PubSubType.PSUBSCRIBE == type) { future = entry.psubscribe(codec, channelName); } else { future = entry.subscribe(codec, channelName); } - + future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { @@ -326,7 +337,7 @@ public class PublishSubscribeService { } return; } - + connectionManager.newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { @@ -337,43 +348,43 @@ public class PublishSubscribeService { }); }); } - + public RFuture unsubscribe(ChannelName channelName, AsyncSemaphore lock) { PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName); if (entry == null || connectionManager.isShuttingDown()) { lock.release(); return RedissonPromise.newSucceededFuture(null); } - + AtomicBoolean executed = new AtomicBoolean(); RedissonPromise result = new RedissonPromise(); ChannelFuture future = entry.unsubscribe(channelName, new BaseRedisPubSubListener() { - + @Override public boolean onStatus(PubSubType type, CharSequence channel) { if (type == PubSubType.UNSUBSCRIBE && channel.equals(channelName)) { executed.set(true); - + if (entry.release() == 1) { - freePubSubConnections.add(entry); + addFreeConnectionEntry(channelName, entry); } - + lock.release(); result.trySuccess(null); return true; } return false; } - + }); - + future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { return; } - + connectionManager.newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { @@ -385,10 +396,10 @@ public class PublishSubscribeService { }, config.getTimeout(), TimeUnit.MILLISECONDS); } }); - + return result; } - + public RFuture unsubscribe(ChannelName channelName, PubSubType topicType) { if (connectionManager.isShuttingDown()) { return RedissonPromise.newSucceededFuture(null); @@ -409,16 +420,17 @@ public class PublishSubscribeService { freePubSubLock.acquire(new Runnable() { @Override public void run() { + Queue freePubSubConnections = getConnectionsQueue(channelName); freePubSubConnections.remove(entry); freePubSubLock.release(); - + Codec entryCodec; if (topicType == PubSubType.PUNSUBSCRIBE) { entryCodec = entry.getConnection().getPatternChannels().get(channelName); } else { entryCodec = entry.getConnection().getChannels().get(channelName); } - + AtomicBoolean executed = new AtomicBoolean(); RedisPubSubListener listener = new BaseRedisPubSubListener() { @@ -426,7 +438,7 @@ public class PublishSubscribeService { public boolean onStatus(PubSubType type, CharSequence channel) { if (type == topicType && channel.equals(channelName)) { executed.set(true); - + lock.release(); result.trySuccess(entryCodec); return true; @@ -442,14 +454,14 @@ public class PublishSubscribeService { } else { future = entry.unsubscribe(channelName, listener); } - + future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { return; } - + connectionManager.newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { @@ -465,50 +477,59 @@ public class PublishSubscribeService { }); } }); - + return result; } - + public void punsubscribe(ChannelName channelName, AsyncSemaphore lock) { PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName); if (entry == null || connectionManager.isShuttingDown()) { lock.release(); return; } - + entry.punsubscribe(channelName, new BaseRedisPubSubListener() { - + @Override public boolean onStatus(PubSubType type, CharSequence channel) { if (type == PubSubType.PUNSUBSCRIBE && channel.equals(channelName)) { - + if (entry.release() == 1) { - freePubSubConnections.add(entry); + addFreeConnectionEntry(channelName, entry); } - + lock.release(); return true; } return false; } - + }); } - + + private void addFreeConnectionEntry(ChannelName channelName, PubSubConnectionEntry entry) { + int slot = connectionManager.calcSlot(channelName.getName()); + MasterSlaveEntry me = connectionManager.getEntry(slot); + Queue freePubSubConnections = freePubSubMap.computeIfAbsent(me, e -> new ConcurrentLinkedQueue<>()); + freePubSubConnections.add(entry); + } + public void reattachPubSub(RedisPubSubConnection redisPubSubConnection) { - for (PubSubConnectionEntry entry : freePubSubConnections) { - if (entry.getConnection().equals(redisPubSubConnection)) { - freePubSubLock.acquire(new Runnable() { - @Override - public void run() { - freePubSubConnections.remove(entry); - freePubSubLock.release(); - } - }); - break; + for (Queue queue : freePubSubMap.values()) { + for (PubSubConnectionEntry entry : queue) { + if (entry.getConnection().equals(redisPubSubConnection)) { + freePubSubLock.acquire(new Runnable() { + @Override + public void run() { + queue.remove(entry); + freePubSubLock.release(); + } + }); + break; + } } } - + for (ChannelName channelName : redisPubSubConnection.getChannels().keySet()) { PubSubConnectionEntry pubSubEntry = getPubSubEntry(channelName); Collection> listeners = pubSubEntry.getListeners(channelName); @@ -527,12 +548,12 @@ public class PublishSubscribeService { if (listeners.isEmpty()) { return; } - + subscribeCodecFuture.onComplete((subscribeCodec, e) -> { if (subscribeCodec == null) { return; } - + if (topicType == PubSubType.PUNSUBSCRIBE) { psubscribe(channelName, listeners, subscribeCodec); } else { @@ -549,7 +570,7 @@ public class PublishSubscribeService { subscribe(channelName, listeners, subscribeCodec); return; } - + log.info("listeners of '{}' channel to '{}' have been resubscribed", channelName, res.getConnection().getRedisClient()); }); } @@ -562,19 +583,14 @@ public class PublishSubscribeService { psubscribe(channelName, listeners, subscribeCodec); return; } - + log.info("listeners of '{}' channel-pattern to '{}' have been resubscribed", channelName, res.getConnection().getRedisClient()); }); } - + @Override public String toString() { - return "PublishSubscribeService [name2PubSubConnection=" + name2PubSubConnection + ", freePubSubConnections=" - + freePubSubConnections + "]"; + return "PublishSubscribeService [name2PubSubConnection=" + name2PubSubConnection + ", freePubSubMap=" + freePubSubMap + "]"; } - - - - } diff --git a/redisson/src/test/java/org/redisson/RedissonTopicTest.java b/redisson/src/test/java/org/redisson/RedissonTopicTest.java index 78bb77421..62ddbd62f 100644 --- a/redisson/src/test/java/org/redisson/RedissonTopicTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTopicTest.java @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.awaitility.Awaitility; import org.awaitility.Duration; import org.junit.After; import org.junit.AfterClass; @@ -1137,7 +1138,49 @@ public class RedissonTopicTest { t.start(); return t; } - + + @Test + public void testClusterSharding() throws IOException, InterruptedException { + RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave(); + + ClusterRunner clusterRunner = new ClusterRunner() + .addNode(master1, slave1) + .addNode(master2, slave2) + .addNode(master3, slave3); + ClusterProcesses process = clusterRunner.run(); + + Config config = new Config(); + config.useClusterServers() + .setLoadBalancer(new RandomLoadBalancer()) + .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); + RedissonClient redisson = Redisson.create(config); + + AtomicInteger counter = new AtomicInteger(); + for (int i = 0; i < 10; i++) { + int j = i; + RTopic topic = redisson.getTopic("test" + i); + topic.addListener(Integer.class, (c, v) -> { + assertThat(v).isEqualTo(j); + counter.incrementAndGet(); + }); + } + + for (int i = 0; i < 10; i++) { + RTopic topic = redisson.getTopic("test" + i); + topic.publish(i); + } + + Awaitility.await().atMost(Duration.FIVE_SECONDS).until(() -> counter.get() == 10); + + redisson.shutdown(); + process.shutdown(); + } + @Test public void testReattachInClusterSlave() throws Exception { RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();