diff --git a/redisson/src/main/java/org/redisson/PubSubPatternMessageListener.java b/redisson/src/main/java/org/redisson/PubSubPatternMessageListener.java index 30b923d36..d07316f51 100644 --- a/redisson/src/main/java/org/redisson/PubSubPatternMessageListener.java +++ b/redisson/src/main/java/org/redisson/PubSubPatternMessageListener.java @@ -16,7 +16,6 @@ package org.redisson; import org.redisson.api.listener.PatternMessageListener; -import org.redisson.client.ChannelName; import org.redisson.client.RedisPubSubListener; import org.redisson.client.protocol.pubsub.PubSubType; @@ -30,15 +29,17 @@ public class PubSubPatternMessageListener implements RedisPubSubListener { private final PatternMessageListener listener; private final String name; + private final Class type; public String getName() { return name; } - public PubSubPatternMessageListener(PatternMessageListener listener, String name) { + public PubSubPatternMessageListener(Class type, PatternMessageListener listener, String name) { super(); this.listener = listener; this.name = name; + this.type = type; } @Override @@ -77,7 +78,7 @@ public class PubSubPatternMessageListener implements RedisPubSubListener { @Override public void onPatternMessage(CharSequence pattern, CharSequence channel, V message) { // could be subscribed to multiple channels - if (name.equals(pattern.toString())) { + if (name.equals(pattern.toString()) && message.getClass() == type) { listener.onMessage(pattern, channel, message); } } diff --git a/redisson/src/main/java/org/redisson/PubSubPatternStatusListener.java b/redisson/src/main/java/org/redisson/PubSubPatternStatusListener.java index 9a9d251e7..76c5421cb 100644 --- a/redisson/src/main/java/org/redisson/PubSubPatternStatusListener.java +++ b/redisson/src/main/java/org/redisson/PubSubPatternStatusListener.java @@ -16,7 +16,6 @@ package org.redisson; import org.redisson.api.listener.PatternStatusListener; -import org.redisson.client.ChannelName; import org.redisson.client.RedisPubSubListener; import org.redisson.client.protocol.pubsub.PubSubType; @@ -24,9 +23,8 @@ import org.redisson.client.protocol.pubsub.PubSubType; * * @author Nikita Koksharov * - * @param value */ -public class PubSubPatternStatusListener implements RedisPubSubListener { +public class PubSubPatternStatusListener implements RedisPubSubListener { private final PatternStatusListener listener; private final String name; @@ -67,11 +65,11 @@ public class PubSubPatternStatusListener implements RedisPubSubListener { } @Override - public void onMessage(CharSequence channel, V message) { + public void onMessage(CharSequence channel, Object message) { } @Override - public void onPatternMessage(CharSequence pattern, CharSequence channel, V message) { + public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) { } @Override diff --git a/redisson/src/main/java/org/redisson/Redisson.java b/redisson/src/main/java/org/redisson/Redisson.java index 5c9d4bc76..a9adf4d34 100755 --- a/redisson/src/main/java/org/redisson/Redisson.java +++ b/redisson/src/main/java/org/redisson/Redisson.java @@ -185,9 +185,9 @@ public class Redisson implements RedissonClient { */ public static RedissonRxClient createRx(Config config) { RedissonRx react = new RedissonRx(config); -// if (config.isReferenceEnabled()) { -// react.enableRedissonReferenceSupport(); -// } + if (config.isReferenceEnabled()) { + react.enableRedissonReferenceSupport(); + } return react; } @@ -505,13 +505,13 @@ public class Redisson implements RedissonClient { } @Override - public RPatternTopic getPatternTopic(String pattern) { - return new RedissonPatternTopic(connectionManager.getCommandExecutor(), pattern); + public RPatternTopic getPatternTopic(String pattern) { + return new RedissonPatternTopic(connectionManager.getCommandExecutor(), pattern); } @Override - public RPatternTopic getPatternTopic(String pattern, Codec codec) { - return new RedissonPatternTopic(codec, connectionManager.getCommandExecutor(), pattern); + public RPatternTopic getPatternTopic(String pattern, Codec codec) { + return new RedissonPatternTopic(codec, connectionManager.getCommandExecutor(), pattern); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonPatternTopic.java b/redisson/src/main/java/org/redisson/RedissonPatternTopic.java index 1cd80a872..21ac70875 100644 --- a/redisson/src/main/java/org/redisson/RedissonPatternTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonPatternTopic.java @@ -27,8 +27,6 @@ import org.redisson.client.RedisPubSubListener; import org.redisson.client.RedisTimeoutException; import org.redisson.client.codec.Codec; import org.redisson.command.CommandAsyncExecutor; -import org.redisson.command.CommandExecutor; -import org.redisson.command.CommandSyncExecutor; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; @@ -44,9 +42,8 @@ import io.netty.util.concurrent.FutureListener; * * @author Nikita Koksharov * - * @param message */ -public class RedissonPatternTopic implements RPatternTopic { +public class RedissonPatternTopic implements RPatternTopic { final PublishSubscribeService subscribeService; final CommandAsyncExecutor commandExecutor; @@ -68,12 +65,12 @@ public class RedissonPatternTopic implements RPatternTopic { @Override public int addListener(PatternStatusListener listener) { - return addListener(new PubSubPatternStatusListener(listener, name)); + return addListener(new PubSubPatternStatusListener(listener, name)); }; @Override - public int addListener(PatternMessageListener listener) { - PubSubPatternMessageListener pubSubListener = new PubSubPatternMessageListener(listener, name); + public int addListener(Class type, PatternMessageListener listener) { + PubSubPatternMessageListener pubSubListener = new PubSubPatternMessageListener(type, listener, name); return addListener(pubSubListener); } @@ -85,13 +82,13 @@ public class RedissonPatternTopic implements RPatternTopic { @Override public RFuture addListenerAsync(PatternStatusListener listener) { - PubSubPatternStatusListener pubSubListener = new PubSubPatternStatusListener(listener, name); + PubSubPatternStatusListener pubSubListener = new PubSubPatternStatusListener(listener, name); return addListenerAsync(pubSubListener); } @Override - public RFuture addListenerAsync(PatternMessageListener listener) { - PubSubPatternMessageListener pubSubListener = new PubSubPatternMessageListener(listener, name); + public RFuture addListenerAsync(Class type, PatternMessageListener listener) { + PubSubPatternMessageListener pubSubListener = new PubSubPatternMessageListener(type, listener, name); return addListenerAsync(pubSubListener); } @@ -159,7 +156,7 @@ public class RedissonPatternTopic implements RPatternTopic { } @Override - public void removeListener(PatternMessageListener listener) { + public void removeListener(PatternMessageListener listener) { AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName); acquire(semaphore); diff --git a/redisson/src/main/java/org/redisson/RedissonReactive.java b/redisson/src/main/java/org/redisson/RedissonReactive.java index 22e0a39ea..82cac62dd 100644 --- a/redisson/src/main/java/org/redisson/RedissonReactive.java +++ b/redisson/src/main/java/org/redisson/RedissonReactive.java @@ -304,13 +304,13 @@ public class RedissonReactive implements RedissonReactiveClient { } @Override - public RPatternTopicReactive getPatternTopic(String pattern) { - return ReactiveProxyBuilder.create(commandExecutor, new RedissonPatternTopic(commandExecutor, pattern), RPatternTopicReactive.class); + public RPatternTopicReactive getPatternTopic(String pattern) { + return ReactiveProxyBuilder.create(commandExecutor, new RedissonPatternTopic(commandExecutor, pattern), RPatternTopicReactive.class); } @Override - public RPatternTopicReactive getPatternTopic(String pattern, Codec codec) { - return ReactiveProxyBuilder.create(commandExecutor, new RedissonPatternTopic(codec, commandExecutor, pattern), RPatternTopicReactive.class); + public RPatternTopicReactive getPatternTopic(String pattern, Codec codec) { + return ReactiveProxyBuilder.create(commandExecutor, new RedissonPatternTopic(codec, commandExecutor, pattern), RPatternTopicReactive.class); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonRx.java b/redisson/src/main/java/org/redisson/RedissonRx.java index c46d121c9..623c8f9ec 100644 --- a/redisson/src/main/java/org/redisson/RedissonRx.java +++ b/redisson/src/main/java/org/redisson/RedissonRx.java @@ -291,13 +291,13 @@ public class RedissonRx implements RedissonRxClient { } @Override - public RPatternTopicRx getPatternTopic(String pattern) { - return RxProxyBuilder.create(commandExecutor, new RedissonTopic(commandExecutor, pattern), RPatternTopicRx.class); + public RPatternTopicRx getPatternTopic(String pattern) { + return RxProxyBuilder.create(commandExecutor, new RedissonPatternTopic(commandExecutor, pattern), RPatternTopicRx.class); } @Override - public RPatternTopicRx getPatternTopic(String pattern, Codec codec) { - return RxProxyBuilder.create(commandExecutor, new RedissonTopic(codec, commandExecutor, pattern), RPatternTopicRx.class); + public RPatternTopicRx getPatternTopic(String pattern, Codec codec) { + return RxProxyBuilder.create(commandExecutor, new RedissonPatternTopic(codec, commandExecutor, pattern), RPatternTopicRx.class); } @Override @@ -426,9 +426,9 @@ public class RedissonRx implements RedissonRxClient { return connectionManager.isShuttingDown(); } -// protected void enableRedissonReferenceSupport() { -// this.commandExecutor.enableRedissonReferenceSupport(this); -// } + protected void enableRedissonReferenceSupport() { + this.commandExecutor.enableRedissonReferenceSupport(this); + } @Override public RMapCacheRx getMapCache(String name, Codec codec, MapOptions options) { diff --git a/redisson/src/main/java/org/redisson/api/RPatternTopic.java b/redisson/src/main/java/org/redisson/api/RPatternTopic.java index d5b187af7..9d2fb43f3 100644 --- a/redisson/src/main/java/org/redisson/api/RPatternTopic.java +++ b/redisson/src/main/java/org/redisson/api/RPatternTopic.java @@ -25,9 +25,8 @@ import org.redisson.api.listener.PatternStatusListener; * * @author Nikita Koksharov * - * @param the type of message object */ -public interface RPatternTopic { +public interface RPatternTopic { /** * Get topic channel patterns @@ -40,12 +39,14 @@ public interface RPatternTopic { * Subscribes to this topic. * MessageListener.onMessage is called when any message * is published on this topic. - * + * + * @param type of message + * @param type - type of message * @param listener - message listener * @return local JVM unique listener id * @see org.redisson.api.listener.MessageListener */ - int addListener(PatternMessageListener listener); + int addListener(Class type, PatternMessageListener listener); /** * Subscribes to status changes of this topic @@ -68,7 +69,7 @@ public interface RPatternTopic { * * @param listener - listener instance */ - void removeListener(PatternMessageListener listener); + void removeListener(PatternMessageListener listener); /** * Removes all listeners from this topic @@ -77,6 +78,6 @@ public interface RPatternTopic { RFuture addListenerAsync(PatternStatusListener listener); - RFuture addListenerAsync(PatternMessageListener listener); + RFuture addListenerAsync(Class type, PatternMessageListener listener); } diff --git a/redisson/src/main/java/org/redisson/api/RPatternTopicReactive.java b/redisson/src/main/java/org/redisson/api/RPatternTopicReactive.java index 3bf635978..675e2b05e 100644 --- a/redisson/src/main/java/org/redisson/api/RPatternTopicReactive.java +++ b/redisson/src/main/java/org/redisson/api/RPatternTopicReactive.java @@ -26,9 +26,8 @@ import org.redisson.api.listener.PatternStatusListener; * * @author Nikita Koksharov * - * @param the type of message object */ -public interface RPatternTopicReactive { +public interface RPatternTopicReactive { /** * Get topic channel patterns @@ -41,12 +40,14 @@ public interface RPatternTopicReactive { * Subscribes to this topic. * MessageListener.onMessage is called when any message * is published on this topic. - * + * + * @param type of message + * @param type - type of message * @param listener - message listener * @return local JVM unique listener id * @see org.redisson.api.listener.MessageListener */ - Publisher addListener(PatternMessageListener listener); + Publisher addListener(Class type, PatternMessageListener listener); /** * Subscribes to status changes of this topic diff --git a/redisson/src/main/java/org/redisson/api/RPatternTopicRx.java b/redisson/src/main/java/org/redisson/api/RPatternTopicRx.java index 7bc3c8780..5120c1516 100644 --- a/redisson/src/main/java/org/redisson/api/RPatternTopicRx.java +++ b/redisson/src/main/java/org/redisson/api/RPatternTopicRx.java @@ -27,9 +27,8 @@ import io.reactivex.Flowable; * * @author Nikita Koksharov * - * @param the type of message object */ -public interface RPatternTopicRx { +public interface RPatternTopicRx { /** * Get topic channel patterns @@ -42,12 +41,14 @@ public interface RPatternTopicRx { * Subscribes to this topic. * MessageListener.onMessage is called when any message * is published on this topic. - * + * + * @param type of message + * @param type - type of message * @param listener - message listener * @return local JVM unique listener id * @see org.redisson.api.listener.MessageListener */ - Flowable addListener(PatternMessageListener listener); + Flowable addListener(Class type, PatternMessageListener listener); /** * Subscribes to status changes of this topic diff --git a/redisson/src/main/java/org/redisson/api/listener/PatternMessageListener.java b/redisson/src/main/java/org/redisson/api/listener/PatternMessageListener.java index 19b2ba0ce..f3f2b8c18 100644 --- a/redisson/src/main/java/org/redisson/api/listener/PatternMessageListener.java +++ b/redisson/src/main/java/org/redisson/api/listener/PatternMessageListener.java @@ -17,8 +17,6 @@ package org.redisson.api.listener; import java.util.EventListener; -import org.redisson.client.ChannelName; - /** * Listener for Redis messages published via RTopic Redisson object * diff --git a/redisson/src/main/java/org/redisson/spring/session/RedissonSessionRepository.java b/redisson/src/main/java/org/redisson/spring/session/RedissonSessionRepository.java index e14e9f352..e5653bbcb 100644 --- a/redisson/src/main/java/org/redisson/spring/session/RedissonSessionRepository.java +++ b/redisson/src/main/java/org/redisson/spring/session/RedissonSessionRepository.java @@ -28,7 +28,6 @@ import org.redisson.api.RSet; import org.redisson.api.RTopic; import org.redisson.api.RedissonClient; import org.redisson.api.listener.PatternMessageListener; -import org.redisson.client.ChannelName; import org.redisson.client.codec.StringCodec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -222,9 +221,9 @@ public class RedissonSessionRepository implements FindByIndexNameSessionReposito private RedissonClient redisson; private ApplicationEventPublisher eventPublisher; - private RPatternTopic deletedTopic; - private RPatternTopic expiredTopic; - private RPatternTopic createdTopic; + private RPatternTopic deletedTopic; + private RPatternTopic expiredTopic; + private RPatternTopic createdTopic; private String keyPrefix = "spring:session:"; private Integer defaultMaxInactiveInterval; @@ -234,11 +233,11 @@ public class RedissonSessionRepository implements FindByIndexNameSessionReposito this.eventPublisher = eventPublisher; deletedTopic = redisson.getPatternTopic("__keyevent@*:del", StringCodec.INSTANCE); - deletedTopic.addListener(this); + deletedTopic.addListener(String.class, this); expiredTopic = redisson.getPatternTopic("__keyevent@*:expired", StringCodec.INSTANCE); - expiredTopic.addListener(this); + expiredTopic.addListener(String.class, this); createdTopic = redisson.getPatternTopic(getEventsChannelPrefix() + "*", StringCodec.INSTANCE); - createdTopic.addListener(this); + createdTopic.addListener(String.class, this); } @Override diff --git a/redisson/src/test/java/org/redisson/RedissonTopicPatternTest.java b/redisson/src/test/java/org/redisson/RedissonTopicPatternTest.java index 03468ce94..23ba50d02 100644 --- a/redisson/src/test/java/org/redisson/RedissonTopicPatternTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTopicPatternTest.java @@ -13,12 +13,9 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; -import org.junit.After; -import org.junit.AfterClass; import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import org.redisson.RedisRunner.RedisProcess; import org.redisson.api.RPatternTopic; @@ -29,35 +26,7 @@ import org.redisson.api.listener.PatternMessageListener; import org.redisson.api.listener.PatternStatusListener; import org.redisson.config.Config; -public class RedissonTopicPatternTest { - - @BeforeClass - public static void beforeClass() throws IOException, InterruptedException { - if (!RedissonRuntimeEnvironment.isTravis) { - RedisRunner.startDefaultRedisServerInstance(); - } - } - - @AfterClass - public static void afterClass() throws IOException, InterruptedException { - if (!RedissonRuntimeEnvironment.isTravis) { - RedisRunner.shutDownDefaultRedisServerInstance(); - } - } - - @Before - public void before() throws IOException, InterruptedException { - if (RedissonRuntimeEnvironment.isTravis) { - RedisRunner.startDefaultRedisServerInstance(); - } - } - - @After - public void after() throws InterruptedException { - if (RedissonRuntimeEnvironment.isTravis) { - RedisRunner.shutDownDefaultRedisServerInstance(); - } - } +public class RedissonTopicPatternTest extends BaseTest { public static class Message { @@ -96,17 +65,37 @@ public class RedissonTopicPatternTest { return "Message{" + "name='" + name + '\'' + '}'; } } + + @Test + public void testMultiType() throws InterruptedException { + RPatternTopic topic1 = redisson.getPatternTopic("topic1.*"); + AtomicInteger str = new AtomicInteger(); + topic1.addListener(String.class, (pattern, channel, msg) -> { + str.incrementAndGet(); + }); + AtomicInteger i = new AtomicInteger(); + topic1.addListener(Integer.class, (pattern, channel, msg) -> { + i.incrementAndGet(); + }); + + redisson.getTopic("topic1.str").publish("123"); + redisson.getTopic("topic1.int").publish(123); + + Thread.sleep(500); + + Assert.assertEquals(i.get(), 1); + Assert.assertEquals(str.get(), 1); + } @Test public void testUnsubscribe() throws InterruptedException { final CountDownLatch messageRecieved = new CountDownLatch(1); - RedissonClient redisson = BaseTest.createInstance(); - RPatternTopic topic1 = redisson.getPatternTopic("topic1.*"); - int listenerId = topic1.addListener((pattern, channel, msg) -> { + RPatternTopic topic1 = redisson.getPatternTopic("topic1.*"); + int listenerId = topic1.addListener(Message.class, (pattern, channel, msg) -> { Assert.fail(); }); - topic1.addListener((pattern, channel, msg) -> { + topic1.addListener(Message.class, (pattern, channel, msg) -> { Assert.assertTrue(pattern.equals("topic1.*")); Assert.assertTrue(channel.equals("topic1.t3")); Assert.assertEquals(new Message("123"), msg); @@ -117,8 +106,6 @@ public class RedissonTopicPatternTest { redisson.getTopic("topic1.t3").publish(new Message("123")); Assert.assertTrue(messageRecieved.await(5, TimeUnit.SECONDS)); - - redisson.shutdown(); } @Test @@ -126,8 +113,8 @@ public class RedissonTopicPatternTest { final CountDownLatch messageRecieved = new CountDownLatch(1); RedissonClient redisson1 = BaseTest.createInstance(); - RPatternTopic topic1 = redisson1.getPatternTopic("topic.*"); - int listenerId = topic1.addListener((pattern, channel, msg) -> { + RPatternTopic topic1 = redisson1.getPatternTopic("topic.*"); + int listenerId = topic1.addListener(Message.class, (pattern, channel, msg) -> { Assert.fail(); }); @@ -136,8 +123,8 @@ public class RedissonTopicPatternTest { Thread.sleep(1000); RedissonClient redisson2 = BaseTest.createInstance(); - RPatternTopic topic2 = redisson2.getPatternTopic("topic.*"); - topic2.addListener((pattern, channel, msg) -> { + RPatternTopic topic2 = redisson2.getPatternTopic("topic.*"); + topic2.addListener(Message.class, (pattern, channel, msg) -> { Assert.assertTrue(pattern.equals("topic.*")); Assert.assertTrue(channel.equals("topic.t1")); Assert.assertEquals(new Message("123"), msg); @@ -159,7 +146,7 @@ public class RedissonTopicPatternTest { final CountDownLatch statusRecieved = new CountDownLatch(1); RedissonClient redisson1 = BaseTest.createInstance(); - RPatternTopic topic1 = redisson1.getPatternTopic("topic.*"); + RPatternTopic topic1 = redisson1.getPatternTopic("topic.*"); topic1.addListener(new BasePatternStatusListener() { @Override public void onPSubscribe(String pattern) { @@ -167,7 +154,7 @@ public class RedissonTopicPatternTest { statusRecieved.countDown(); } }); - topic1.addListener((pattern, channel, msg) -> { + topic1.addListener(Message.class, (pattern, channel, msg) -> { Assert.assertEquals(new Message("123"), msg); messageRecieved.countDown(); }); @@ -199,7 +186,7 @@ public class RedissonTopicPatternTest { @Test public void testListenerRemove() throws InterruptedException { RedissonClient redisson1 = BaseTest.createInstance(); - RPatternTopic topic1 = redisson1.getPatternTopic("topic.*"); + RPatternTopic topic1 = redisson1.getPatternTopic("topic.*"); final CountDownLatch l = new CountDownLatch(1); topic1.addListener(new BasePatternStatusListener() { @Override @@ -208,7 +195,7 @@ public class RedissonTopicPatternTest { l.countDown(); } }); - int id = topic1.addListener((pattern, channel, msg) -> { + int id = topic1.addListener(Message.class, (pattern, channel, msg) -> { Assert.fail(); }); @@ -223,9 +210,6 @@ public class RedissonTopicPatternTest { @Test public void testConcurrentTopic() throws Exception { - Config config = BaseTest.createConfig(); - RedissonClient redisson = Redisson.create(config); - int threads = 30; int loops = 50000; @@ -238,7 +222,7 @@ public class RedissonTopicPatternTest { @Override public void run() { for (int j = 0; j < loops; j++) { - RPatternTopic t = redisson.getPatternTopic("PUBSUB*"); + RPatternTopic t = redisson.getPatternTopic("PUBSUB*"); int listenerId = t.addListener(new PatternStatusListener() { @Override public void onPUnsubscribe(String channel) { @@ -262,8 +246,6 @@ public class RedissonTopicPatternTest { for (Future future : futures) { future.get(); } - - redisson.shutdown(); } @Test @@ -280,8 +262,8 @@ public class RedissonTopicPatternTest { final AtomicBoolean executed = new AtomicBoolean(); - RPatternTopic topic = redisson.getPatternTopic("topic*"); - topic.addListener(new PatternMessageListener() { + RPatternTopic topic = redisson.getPatternTopic("topic*"); + topic.addListener(Integer.class, new PatternMessageListener() { @Override public void onMessage(CharSequence pattern, CharSequence channel, Integer msg) { if (msg == 1) { diff --git a/redisson/src/test/java/org/redisson/RedissonTopicTest.java b/redisson/src/test/java/org/redisson/RedissonTopicTest.java index c9d0d742a..a00b2f009 100644 --- a/redisson/src/test/java/org/redisson/RedissonTopicTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTopicTest.java @@ -26,6 +26,7 @@ import org.junit.BeforeClass; import org.junit.Test; import org.redisson.ClusterRunner.ClusterProcesses; import org.redisson.RedisRunner.RedisProcess; +import org.redisson.RedissonTopicPatternTest.Message; import org.redisson.api.RFuture; import org.redisson.api.RPatternTopic; import org.redisson.api.RSet; @@ -205,8 +206,8 @@ public class RedissonTopicTest { stringMessageReceived.incrementAndGet(); } }); - RPatternTopic patternTopic = redisson.getPatternTopic("test*", StringCodec.INSTANCE); - int patternListenerId = patternTopic.addListener(new PatternMessageListener() { + RPatternTopic patternTopic = redisson.getPatternTopic("test*", StringCodec.INSTANCE); + int patternListenerId = patternTopic.addListener(String.class, new PatternMessageListener() { @Override public void onMessage(CharSequence pattern, CharSequence channel, String msg) { assertThat(msg).isEqualTo("testmsg");