diff --git a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java index b3094aca4..0300a28d6 100644 --- a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java @@ -64,9 +64,9 @@ public interface ConnectionManager { boolean isShuttingDown(); - RFuture subscribe(Codec codec, String channelName, RedisPubSubListener listener); + RFuture subscribe(Codec codec, String channelName, RedisPubSubListener... listeners); - RFuture subscribe(Codec codec, String channelName, RedisPubSubListener listener, AsyncSemaphore semaphore); + RFuture subscribe(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener... listeners); ConnectionInitializer getConnectListener(); @@ -106,9 +106,9 @@ public interface ConnectionManager { PubSubConnectionEntry getPubSubEntry(String channelName); - RFuture psubscribe(String pattern, Codec codec, RedisPubSubListener listener); + RFuture psubscribe(String pattern, Codec codec, RedisPubSubListener... listeners); - RFuture psubscribe(String pattern, Codec codec, RedisPubSubListener listener, AsyncSemaphore semaphore); + RFuture psubscribe(String pattern, Codec codec, AsyncSemaphore semaphore, RedisPubSubListener... listeners); Codec unsubscribe(String channelName, AsyncSemaphore lock); diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 8893ee84c..b903de260 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -364,41 +364,41 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } @Override - public RFuture psubscribe(final String channelName, final Codec codec, final RedisPubSubListener listener) { + public RFuture psubscribe(final String channelName, final Codec codec, final RedisPubSubListener... listeners) { final AsyncSemaphore lock = getSemaphore(channelName); final RPromise result = newPromise(); lock.acquire(new Runnable() { @Override public void run() { - RFuture future = psubscribe(channelName, codec, listener, lock); + RFuture future = psubscribe(channelName, codec, lock, listeners); future.addListener(new TransferListener(result)); } }); return result; } - public RFuture psubscribe(String channelName, Codec codec, RedisPubSubListener listener, AsyncSemaphore semaphore) { + public RFuture psubscribe(String channelName, Codec codec, AsyncSemaphore semaphore, RedisPubSubListener... listeners) { RPromise promise = newPromise(); - subscribe(codec, channelName, listener, promise, PubSubType.PSUBSCRIBE, semaphore); + subscribe(codec, channelName, promise, PubSubType.PSUBSCRIBE, semaphore, listeners); return promise; } - public RFuture subscribe(final Codec codec, final String channelName, final RedisPubSubListener listener) { + public RFuture subscribe(final Codec codec, final String channelName, final RedisPubSubListener... listeners) { final AsyncSemaphore lock = getSemaphore(channelName); final RPromise result = newPromise(); lock.acquire(new Runnable() { @Override public void run() { - RFuture future = subscribe(codec, channelName, listener, lock); + RFuture future = subscribe(codec, channelName, lock, listeners); future.addListener(new TransferListener(result)); } }); return result; } - public RFuture subscribe(Codec codec, String channelName, RedisPubSubListener listener, AsyncSemaphore semaphore) { + public RFuture subscribe(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener... listeners) { RPromise promise = newPromise(); - subscribe(codec, channelName, listener, promise, PubSubType.SUBSCRIBE, semaphore); + subscribe(codec, channelName, promise, PubSubType.SUBSCRIBE, semaphore, listeners); return promise; } @@ -406,18 +406,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return locks[Math.abs(channelName.hashCode() % locks.length)]; } - private void subscribe(final Codec codec, final String channelName, final RedisPubSubListener listener, - final RPromise promise, final PubSubType type, final AsyncSemaphore lock) { + private void subscribe(final Codec codec, final String channelName, + final RPromise promise, final PubSubType type, final AsyncSemaphore lock, final RedisPubSubListener... listeners) { final PubSubConnectionEntry connEntry = name2PubSubConnection.get(channelName); if (connEntry != null) { - connEntry.addListener(channelName, listener); - connEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - lock.release(); - promise.trySuccess(connEntry); - } - }); + subscribe(channelName, promise, type, lock, connEntry, listeners); return; } @@ -431,7 +424,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { final PubSubConnectionEntry freeEntry = freePubSubConnections.peek(); if (freeEntry == null) { - connect(codec, channelName, listener, promise, type, lock); + connect(codec, channelName, promise, type, lock, listeners); return; } @@ -445,14 +438,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { freeEntry.release(); freePubSubLock.release(); - oldEntry.addListener(channelName, listener); - oldEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - lock.release(); - promise.trySuccess(oldEntry); - } - }); + subscribe(channelName, promise, type, lock, oldEntry, listeners); return; } @@ -461,14 +447,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } freePubSubLock.release(); - freeEntry.addListener(channelName, listener); - freeEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - lock.release(); - promise.trySuccess(freeEntry); - } - }); + subscribe(channelName, promise, type, lock, freeEntry, listeners); if (PubSubType.PSUBSCRIBE == type) { freeEntry.psubscribe(codec, channelName); @@ -480,8 +459,23 @@ public class MasterSlaveConnectionManager implements ConnectionManager { }); } - private void connect(final Codec codec, final String channelName, final RedisPubSubListener listener, - final RPromise promise, final PubSubType type, final AsyncSemaphore lock) { + private void subscribe(final String channelName, final RPromise promise, + final PubSubType type, final AsyncSemaphore lock, final PubSubConnectionEntry connEntry, + final RedisPubSubListener... listeners) { + for (RedisPubSubListener listener : listeners) { + connEntry.addListener(channelName, listener); + } + connEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + lock.release(); + promise.trySuccess(connEntry); + } + }); + } + + private void connect(final Codec codec, final String channelName, + final RPromise promise, final PubSubType type, final AsyncSemaphore lock, final RedisPubSubListener... listeners) { final int slot = calcSlot(channelName); RFuture connFuture = nextPubSubConnection(slot); connFuture.addListener(new FutureListener() { @@ -505,29 +499,15 @@ public class MasterSlaveConnectionManager implements ConnectionManager { releaseSubscribeConnection(slot, entry); freePubSubLock.release(); - - oldEntry.addListener(channelName, listener); - oldEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - lock.release(); - promise.trySuccess(oldEntry); - } - }); + + subscribe(channelName, promise, type, lock, oldEntry, listeners); return; } freePubSubConnections.add(entry); freePubSubLock.release(); - entry.addListener(channelName, listener); - entry.getSubscribeFuture(channelName, type).addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - lock.release(); - promise.trySuccess(entry); - } - }); + subscribe(channelName, promise, type, lock, entry, listeners); if (PubSubType.PSUBSCRIBE == type) { entry.psubscribe(codec, channelName); diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 2fab36ea6..dffca6a57 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -211,21 +211,15 @@ public class MasterSlaveEntry { private void subscribe(final String channelName, final Collection> listeners, final Codec subscribeCodec) { - RFuture subscribeFuture = connectionManager.subscribe(subscribeCodec, channelName, null); + RFuture subscribeFuture = connectionManager.subscribe(subscribeCodec, channelName, listeners.toArray(new RedisPubSubListener[listeners.size()])); subscribeFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - subscribe(channelName, listeners, subscribeCodec); - return; - } - PubSubConnectionEntry newEntry = future.getNow(); - for (RedisPubSubListener redisPubSubListener : listeners) { - newEntry.addListener(channelName, redisPubSubListener); + if (future.isSuccess()) { + log.debug("resubscribed listeners of '{}' channel to {}", channelName, future.getNow().getConnection().getRedisClient()); } - log.debug("resubscribed listeners of '{}' channel to {}", channelName, newEntry.getConnection().getRedisClient()); } }); } diff --git a/redisson/src/main/java/org/redisson/connection/PubSubConnectionEntry.java b/redisson/src/main/java/org/redisson/connection/PubSubConnectionEntry.java index f953f3ac4..38d59ea0c 100644 --- a/redisson/src/main/java/org/redisson/connection/PubSubConnectionEntry.java +++ b/redisson/src/main/java/org/redisson/connection/PubSubConnectionEntry.java @@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.redisson.PubSubMessageListener; import org.redisson.PubSubPatternMessageListener; -import org.redisson.api.listener.MessageListener; import org.redisson.client.BaseRedisPubSubListener; import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubListener; diff --git a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribe.java b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribe.java index d3da6d159..35bed9a3b 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribe.java +++ b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribe.java @@ -98,7 +98,7 @@ abstract class PublishSubscribe> { } RedisPubSubListener listener = createListener(channelName, value); - connectionManager.subscribe(LongCodec.INSTANCE, channelName, listener, semaphore); + connectionManager.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener); } }; semaphore.acquire(listener); diff --git a/redisson/src/test/java/org/redisson/RedissonTopicTest.java b/redisson/src/test/java/org/redisson/RedissonTopicTest.java index 3270fa6bf..5eacbddd9 100644 --- a/redisson/src/test/java/org/redisson/RedissonTopicTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTopicTest.java @@ -6,15 +6,15 @@ import static org.assertj.core.api.Assertions.assertThat; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.After; import org.junit.AfterClass; @@ -22,6 +22,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.redisson.ClusterRunner.ClusterProcesses; import org.redisson.RedisRunner.RedisProcess; import org.redisson.api.RSet; import org.redisson.api.RTopic; @@ -463,7 +464,7 @@ public class RedissonTopicTest { } @Test - public void testReattach() throws InterruptedException, IOException, ExecutionException, TimeoutException { + public void testReattach() throws Exception { RedisProcess runner = new RedisRunner() .nosave() .randomDir() @@ -475,14 +476,24 @@ public class RedissonTopicTest { RedissonClient redisson = Redisson.create(config); final AtomicBoolean executed = new AtomicBoolean(); + final AtomicInteger subscriptions = new AtomicInteger(); RTopic topic = redisson.getTopic("topic"); + topic.addListener(new StatusListener() { + + @Override + public void onUnsubscribe(String channel) { + } + + @Override + public void onSubscribe(String channel) { + subscriptions.incrementAndGet(); + } + }); topic.addListener(new MessageListener() { @Override public void onMessage(String channel, Integer msg) { - if (msg == 1) { - executed.set(true); - } + executed.set(true); } }); @@ -498,10 +509,77 @@ public class RedissonTopicTest { redisson.getTopic("topic").publish(1); - await().atMost(5, TimeUnit.SECONDS).untilTrue(executed); + await().atMost(2, TimeUnit.SECONDS).untilTrue(executed); + await().atMost(2, TimeUnit.SECONDS).until(() -> subscriptions.get() == 2); redisson.shutdown(); runner.stop(); } + + @Test + public void testReattachInCluster() throws Exception { + RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave(); + + + ClusterRunner clusterRunner = new ClusterRunner() + .addNode(master1, slave1) + .addNode(master2, slave2) + .addNode(master3, slave3); + ClusterProcesses process = clusterRunner.run(); + + Config config = new Config(); + config.useClusterServers() + .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); + RedissonClient redisson = Redisson.create(config); + + final AtomicBoolean executed = new AtomicBoolean(); + final AtomicInteger subscriptions = new AtomicInteger(); + + RTopic topic = redisson.getTopic("topic"); + topic.addListener(new StatusListener() { + + @Override + public void onUnsubscribe(String channel) { + } + + @Override + public void onSubscribe(String channel) { + subscriptions.incrementAndGet(); + } + }); + topic.addListener(new MessageListener() { + @Override + public void onMessage(String channel, Integer msg) { + executed.set(true); + } + }); + + process.getNodes().stream().filter(x -> Arrays.asList(slave1.getPort(), slave2.getPort(), slave3.getPort()).contains(x.getRedisServerPort())) + .forEach(x -> { + try { + x.stop(); + Thread.sleep(18000); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + }); + + Thread.sleep(15000); + + redisson.getTopic("topic").publish(1); + + await().atMost(75, TimeUnit.SECONDS).until(() -> subscriptions.get() == 2); + Assert.assertTrue(executed.get()); + + redisson.shutdown(); + process.shutdown(); + } + }