Feature - multi type listeners support for RPatternTopic object. #731

pull/1705/head
Nikita 6 years ago
parent 59cc93be95
commit f52612c383

@ -16,7 +16,6 @@
package org.redisson;
import org.redisson.api.listener.PatternMessageListener;
import org.redisson.client.ChannelName;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.protocol.pubsub.PubSubType;
@ -30,15 +29,17 @@ public class PubSubPatternMessageListener<V> implements RedisPubSubListener<V> {
private final PatternMessageListener<V> listener;
private final String name;
private final Class<V> type;
public String getName() {
return name;
}
public PubSubPatternMessageListener(PatternMessageListener<V> listener, String name) {
public PubSubPatternMessageListener(Class<V> type, PatternMessageListener<V> listener, String name) {
super();
this.listener = listener;
this.name = name;
this.type = type;
}
@Override
@ -77,7 +78,7 @@ public class PubSubPatternMessageListener<V> implements RedisPubSubListener<V> {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence channel, V message) {
// could be subscribed to multiple channels
if (name.equals(pattern.toString())) {
if (name.equals(pattern.toString()) && message.getClass() == type) {
listener.onMessage(pattern, channel, message);
}
}

@ -16,7 +16,6 @@
package org.redisson;
import org.redisson.api.listener.PatternStatusListener;
import org.redisson.client.ChannelName;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.protocol.pubsub.PubSubType;
@ -24,9 +23,8 @@ import org.redisson.client.protocol.pubsub.PubSubType;
*
* @author Nikita Koksharov
*
* @param <V> value
*/
public class PubSubPatternStatusListener<V> implements RedisPubSubListener<V> {
public class PubSubPatternStatusListener implements RedisPubSubListener<Object> {
private final PatternStatusListener listener;
private final String name;
@ -67,11 +65,11 @@ public class PubSubPatternStatusListener<V> implements RedisPubSubListener<V> {
}
@Override
public void onMessage(CharSequence channel, V message) {
public void onMessage(CharSequence channel, Object message) {
}
@Override
public void onPatternMessage(CharSequence pattern, CharSequence channel, V message) {
public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) {
}
@Override

@ -185,9 +185,9 @@ public class Redisson implements RedissonClient {
*/
public static RedissonRxClient createRx(Config config) {
RedissonRx react = new RedissonRx(config);
// if (config.isReferenceEnabled()) {
// react.enableRedissonReferenceSupport();
// }
if (config.isReferenceEnabled()) {
react.enableRedissonReferenceSupport();
}
return react;
}
@ -505,13 +505,13 @@ public class Redisson implements RedissonClient {
}
@Override
public <M> RPatternTopic<M> getPatternTopic(String pattern) {
return new RedissonPatternTopic<M>(connectionManager.getCommandExecutor(), pattern);
public RPatternTopic getPatternTopic(String pattern) {
return new RedissonPatternTopic(connectionManager.getCommandExecutor(), pattern);
}
@Override
public <M> RPatternTopic<M> getPatternTopic(String pattern, Codec codec) {
return new RedissonPatternTopic<M>(codec, connectionManager.getCommandExecutor(), pattern);
public RPatternTopic getPatternTopic(String pattern, Codec codec) {
return new RedissonPatternTopic(codec, connectionManager.getCommandExecutor(), pattern);
}
@Override

@ -27,8 +27,6 @@ import org.redisson.client.RedisPubSubListener;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandExecutor;
import org.redisson.command.CommandSyncExecutor;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
@ -44,9 +42,8 @@ import io.netty.util.concurrent.FutureListener;
*
* @author Nikita Koksharov
*
* @param <M> message
*/
public class RedissonPatternTopic<M> implements RPatternTopic<M> {
public class RedissonPatternTopic implements RPatternTopic {
final PublishSubscribeService subscribeService;
final CommandAsyncExecutor commandExecutor;
@ -68,12 +65,12 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
@Override
public int addListener(PatternStatusListener listener) {
return addListener(new PubSubPatternStatusListener<Object>(listener, name));
return addListener(new PubSubPatternStatusListener(listener, name));
};
@Override
public int addListener(PatternMessageListener<M> listener) {
PubSubPatternMessageListener<M> pubSubListener = new PubSubPatternMessageListener<M>(listener, name);
public <T> int addListener(Class<T> type, PatternMessageListener<T> listener) {
PubSubPatternMessageListener<T> pubSubListener = new PubSubPatternMessageListener<T>(type, listener, name);
return addListener(pubSubListener);
}
@ -85,13 +82,13 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
@Override
public RFuture<Integer> addListenerAsync(PatternStatusListener listener) {
PubSubPatternStatusListener<M> pubSubListener = new PubSubPatternStatusListener<M>(listener, name);
PubSubPatternStatusListener pubSubListener = new PubSubPatternStatusListener(listener, name);
return addListenerAsync(pubSubListener);
}
@Override
public RFuture<Integer> addListenerAsync(PatternMessageListener<M> listener) {
PubSubPatternMessageListener<M> pubSubListener = new PubSubPatternMessageListener<M>(listener, name);
public <T> RFuture<Integer> addListenerAsync(Class<T> type, PatternMessageListener<T> listener) {
PubSubPatternMessageListener<T> pubSubListener = new PubSubPatternMessageListener<T>(type, listener, name);
return addListenerAsync(pubSubListener);
}
@ -159,7 +156,7 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
}
@Override
public void removeListener(PatternMessageListener<M> listener) {
public void removeListener(PatternMessageListener<?> listener) {
AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName);
acquire(semaphore);

@ -304,13 +304,13 @@ public class RedissonReactive implements RedissonReactiveClient {
}
@Override
public <M> RPatternTopicReactive<M> getPatternTopic(String pattern) {
return ReactiveProxyBuilder.create(commandExecutor, new RedissonPatternTopic<M>(commandExecutor, pattern), RPatternTopicReactive.class);
public RPatternTopicReactive getPatternTopic(String pattern) {
return ReactiveProxyBuilder.create(commandExecutor, new RedissonPatternTopic(commandExecutor, pattern), RPatternTopicReactive.class);
}
@Override
public <M> RPatternTopicReactive<M> getPatternTopic(String pattern, Codec codec) {
return ReactiveProxyBuilder.create(commandExecutor, new RedissonPatternTopic<M>(codec, commandExecutor, pattern), RPatternTopicReactive.class);
public RPatternTopicReactive getPatternTopic(String pattern, Codec codec) {
return ReactiveProxyBuilder.create(commandExecutor, new RedissonPatternTopic(codec, commandExecutor, pattern), RPatternTopicReactive.class);
}
@Override

@ -291,13 +291,13 @@ public class RedissonRx implements RedissonRxClient {
}
@Override
public <M> RPatternTopicRx<M> getPatternTopic(String pattern) {
return RxProxyBuilder.create(commandExecutor, new RedissonTopic<M>(commandExecutor, pattern), RPatternTopicRx.class);
public RPatternTopicRx getPatternTopic(String pattern) {
return RxProxyBuilder.create(commandExecutor, new RedissonPatternTopic(commandExecutor, pattern), RPatternTopicRx.class);
}
@Override
public <M> RPatternTopicRx<M> getPatternTopic(String pattern, Codec codec) {
return RxProxyBuilder.create(commandExecutor, new RedissonTopic<M>(codec, commandExecutor, pattern), RPatternTopicRx.class);
public RPatternTopicRx getPatternTopic(String pattern, Codec codec) {
return RxProxyBuilder.create(commandExecutor, new RedissonPatternTopic(codec, commandExecutor, pattern), RPatternTopicRx.class);
}
@Override
@ -426,9 +426,9 @@ public class RedissonRx implements RedissonRxClient {
return connectionManager.isShuttingDown();
}
// protected void enableRedissonReferenceSupport() {
// this.commandExecutor.enableRedissonReferenceSupport(this);
// }
protected void enableRedissonReferenceSupport() {
this.commandExecutor.enableRedissonReferenceSupport(this);
}
@Override
public <K, V> RMapCacheRx<K, V> getMapCache(String name, Codec codec, MapOptions<K, V> options) {

@ -25,9 +25,8 @@ import org.redisson.api.listener.PatternStatusListener;
*
* @author Nikita Koksharov
*
* @param <M> the type of message object
*/
public interface RPatternTopic<M> {
public interface RPatternTopic {
/**
* Get topic channel patterns
@ -40,12 +39,14 @@ public interface RPatternTopic<M> {
* Subscribes to this topic.
* <code>MessageListener.onMessage</code> is called when any message
* is published on this topic.
*
*
* @param <T> type of message
* @param type - type of message
* @param listener - message listener
* @return local JVM unique listener id
* @see org.redisson.api.listener.MessageListener
*/
int addListener(PatternMessageListener<M> listener);
<T> int addListener(Class<T> type, PatternMessageListener<T> listener);
/**
* Subscribes to status changes of this topic
@ -68,7 +69,7 @@ public interface RPatternTopic<M> {
*
* @param listener - listener instance
*/
void removeListener(PatternMessageListener<M> listener);
void removeListener(PatternMessageListener<?> listener);
/**
* Removes all listeners from this topic
@ -77,6 +78,6 @@ public interface RPatternTopic<M> {
RFuture<Integer> addListenerAsync(PatternStatusListener listener);
RFuture<Integer> addListenerAsync(PatternMessageListener<M> listener);
<T> RFuture<Integer> addListenerAsync(Class<T> type, PatternMessageListener<T> listener);
}

@ -26,9 +26,8 @@ import org.redisson.api.listener.PatternStatusListener;
*
* @author Nikita Koksharov
*
* @param <M> the type of message object
*/
public interface RPatternTopicReactive<M> {
public interface RPatternTopicReactive {
/**
* Get topic channel patterns
@ -41,12 +40,14 @@ public interface RPatternTopicReactive<M> {
* Subscribes to this topic.
* <code>MessageListener.onMessage</code> is called when any message
* is published on this topic.
*
*
* @param <T> type of message
* @param type - type of message
* @param listener - message listener
* @return local JVM unique listener id
* @see org.redisson.api.listener.MessageListener
*/
Publisher<Integer> addListener(PatternMessageListener<M> listener);
<T> Publisher<Integer> addListener(Class<T> type, PatternMessageListener<T> listener);
/**
* Subscribes to status changes of this topic

@ -27,9 +27,8 @@ import io.reactivex.Flowable;
*
* @author Nikita Koksharov
*
* @param <M> the type of message object
*/
public interface RPatternTopicRx<M> {
public interface RPatternTopicRx {
/**
* Get topic channel patterns
@ -42,12 +41,14 @@ public interface RPatternTopicRx<M> {
* Subscribes to this topic.
* <code>MessageListener.onMessage</code> is called when any message
* is published on this topic.
*
*
* @param <T> type of message
* @param type - type of message
* @param listener - message listener
* @return local JVM unique listener id
* @see org.redisson.api.listener.MessageListener
*/
Flowable<Integer> addListener(PatternMessageListener<M> listener);
<T> Flowable<Integer> addListener(Class<T> type, PatternMessageListener<T> listener);
/**
* Subscribes to status changes of this topic

@ -17,8 +17,6 @@ package org.redisson.api.listener;
import java.util.EventListener;
import org.redisson.client.ChannelName;
/**
* Listener for Redis messages published via RTopic Redisson object
*

@ -28,7 +28,6 @@ import org.redisson.api.RSet;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.PatternMessageListener;
import org.redisson.client.ChannelName;
import org.redisson.client.codec.StringCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -222,9 +221,9 @@ public class RedissonSessionRepository implements FindByIndexNameSessionReposito
private RedissonClient redisson;
private ApplicationEventPublisher eventPublisher;
private RPatternTopic<String> deletedTopic;
private RPatternTopic<String> expiredTopic;
private RPatternTopic<String> createdTopic;
private RPatternTopic deletedTopic;
private RPatternTopic expiredTopic;
private RPatternTopic createdTopic;
private String keyPrefix = "spring:session:";
private Integer defaultMaxInactiveInterval;
@ -234,11 +233,11 @@ public class RedissonSessionRepository implements FindByIndexNameSessionReposito
this.eventPublisher = eventPublisher;
deletedTopic = redisson.getPatternTopic("__keyevent@*:del", StringCodec.INSTANCE);
deletedTopic.addListener(this);
deletedTopic.addListener(String.class, this);
expiredTopic = redisson.getPatternTopic("__keyevent@*:expired", StringCodec.INSTANCE);
expiredTopic.addListener(this);
expiredTopic.addListener(String.class, this);
createdTopic = redisson.getPatternTopic(getEventsChannelPrefix() + "*", StringCodec.INSTANCE);
createdTopic.addListener(this);
createdTopic.addListener(String.class, this);
}
@Override

@ -13,12 +13,9 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.redisson.RedisRunner.RedisProcess;
import org.redisson.api.RPatternTopic;
@ -29,35 +26,7 @@ import org.redisson.api.listener.PatternMessageListener;
import org.redisson.api.listener.PatternStatusListener;
import org.redisson.config.Config;
public class RedissonTopicPatternTest {
@BeforeClass
public static void beforeClass() throws IOException, InterruptedException {
if (!RedissonRuntimeEnvironment.isTravis) {
RedisRunner.startDefaultRedisServerInstance();
}
}
@AfterClass
public static void afterClass() throws IOException, InterruptedException {
if (!RedissonRuntimeEnvironment.isTravis) {
RedisRunner.shutDownDefaultRedisServerInstance();
}
}
@Before
public void before() throws IOException, InterruptedException {
if (RedissonRuntimeEnvironment.isTravis) {
RedisRunner.startDefaultRedisServerInstance();
}
}
@After
public void after() throws InterruptedException {
if (RedissonRuntimeEnvironment.isTravis) {
RedisRunner.shutDownDefaultRedisServerInstance();
}
}
public class RedissonTopicPatternTest extends BaseTest {
public static class Message {
@ -96,17 +65,37 @@ public class RedissonTopicPatternTest {
return "Message{" + "name='" + name + '\'' + '}';
}
}
@Test
public void testMultiType() throws InterruptedException {
RPatternTopic topic1 = redisson.getPatternTopic("topic1.*");
AtomicInteger str = new AtomicInteger();
topic1.addListener(String.class, (pattern, channel, msg) -> {
str.incrementAndGet();
});
AtomicInteger i = new AtomicInteger();
topic1.addListener(Integer.class, (pattern, channel, msg) -> {
i.incrementAndGet();
});
redisson.getTopic("topic1.str").publish("123");
redisson.getTopic("topic1.int").publish(123);
Thread.sleep(500);
Assert.assertEquals(i.get(), 1);
Assert.assertEquals(str.get(), 1);
}
@Test
public void testUnsubscribe() throws InterruptedException {
final CountDownLatch messageRecieved = new CountDownLatch(1);
RedissonClient redisson = BaseTest.createInstance();
RPatternTopic<Message> topic1 = redisson.getPatternTopic("topic1.*");
int listenerId = topic1.addListener((pattern, channel, msg) -> {
RPatternTopic topic1 = redisson.getPatternTopic("topic1.*");
int listenerId = topic1.addListener(Message.class, (pattern, channel, msg) -> {
Assert.fail();
});
topic1.addListener((pattern, channel, msg) -> {
topic1.addListener(Message.class, (pattern, channel, msg) -> {
Assert.assertTrue(pattern.equals("topic1.*"));
Assert.assertTrue(channel.equals("topic1.t3"));
Assert.assertEquals(new Message("123"), msg);
@ -117,8 +106,6 @@ public class RedissonTopicPatternTest {
redisson.getTopic("topic1.t3").publish(new Message("123"));
Assert.assertTrue(messageRecieved.await(5, TimeUnit.SECONDS));
redisson.shutdown();
}
@Test
@ -126,8 +113,8 @@ public class RedissonTopicPatternTest {
final CountDownLatch messageRecieved = new CountDownLatch(1);
RedissonClient redisson1 = BaseTest.createInstance();
RPatternTopic<Message> topic1 = redisson1.getPatternTopic("topic.*");
int listenerId = topic1.addListener((pattern, channel, msg) -> {
RPatternTopic topic1 = redisson1.getPatternTopic("topic.*");
int listenerId = topic1.addListener(Message.class, (pattern, channel, msg) -> {
Assert.fail();
});
@ -136,8 +123,8 @@ public class RedissonTopicPatternTest {
Thread.sleep(1000);
RedissonClient redisson2 = BaseTest.createInstance();
RPatternTopic<Message> topic2 = redisson2.getPatternTopic("topic.*");
topic2.addListener((pattern, channel, msg) -> {
RPatternTopic topic2 = redisson2.getPatternTopic("topic.*");
topic2.addListener(Message.class, (pattern, channel, msg) -> {
Assert.assertTrue(pattern.equals("topic.*"));
Assert.assertTrue(channel.equals("topic.t1"));
Assert.assertEquals(new Message("123"), msg);
@ -159,7 +146,7 @@ public class RedissonTopicPatternTest {
final CountDownLatch statusRecieved = new CountDownLatch(1);
RedissonClient redisson1 = BaseTest.createInstance();
RPatternTopic<Message> topic1 = redisson1.getPatternTopic("topic.*");
RPatternTopic topic1 = redisson1.getPatternTopic("topic.*");
topic1.addListener(new BasePatternStatusListener() {
@Override
public void onPSubscribe(String pattern) {
@ -167,7 +154,7 @@ public class RedissonTopicPatternTest {
statusRecieved.countDown();
}
});
topic1.addListener((pattern, channel, msg) -> {
topic1.addListener(Message.class, (pattern, channel, msg) -> {
Assert.assertEquals(new Message("123"), msg);
messageRecieved.countDown();
});
@ -199,7 +186,7 @@ public class RedissonTopicPatternTest {
@Test
public void testListenerRemove() throws InterruptedException {
RedissonClient redisson1 = BaseTest.createInstance();
RPatternTopic<Message> topic1 = redisson1.getPatternTopic("topic.*");
RPatternTopic topic1 = redisson1.getPatternTopic("topic.*");
final CountDownLatch l = new CountDownLatch(1);
topic1.addListener(new BasePatternStatusListener() {
@Override
@ -208,7 +195,7 @@ public class RedissonTopicPatternTest {
l.countDown();
}
});
int id = topic1.addListener((pattern, channel, msg) -> {
int id = topic1.addListener(Message.class, (pattern, channel, msg) -> {
Assert.fail();
});
@ -223,9 +210,6 @@ public class RedissonTopicPatternTest {
@Test
public void testConcurrentTopic() throws Exception {
Config config = BaseTest.createConfig();
RedissonClient redisson = Redisson.create(config);
int threads = 30;
int loops = 50000;
@ -238,7 +222,7 @@ public class RedissonTopicPatternTest {
@Override
public void run() {
for (int j = 0; j < loops; j++) {
RPatternTopic<String> t = redisson.getPatternTopic("PUBSUB*");
RPatternTopic t = redisson.getPatternTopic("PUBSUB*");
int listenerId = t.addListener(new PatternStatusListener() {
@Override
public void onPUnsubscribe(String channel) {
@ -262,8 +246,6 @@ public class RedissonTopicPatternTest {
for (Future<?> future : futures) {
future.get();
}
redisson.shutdown();
}
@Test
@ -280,8 +262,8 @@ public class RedissonTopicPatternTest {
final AtomicBoolean executed = new AtomicBoolean();
RPatternTopic<Integer> topic = redisson.getPatternTopic("topic*");
topic.addListener(new PatternMessageListener<Integer>() {
RPatternTopic topic = redisson.getPatternTopic("topic*");
topic.addListener(Integer.class, new PatternMessageListener<Integer>() {
@Override
public void onMessage(CharSequence pattern, CharSequence channel, Integer msg) {
if (msg == 1) {

@ -26,6 +26,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner.RedisProcess;
import org.redisson.RedissonTopicPatternTest.Message;
import org.redisson.api.RFuture;
import org.redisson.api.RPatternTopic;
import org.redisson.api.RSet;
@ -205,8 +206,8 @@ public class RedissonTopicTest {
stringMessageReceived.incrementAndGet();
}
});
RPatternTopic<String> patternTopic = redisson.getPatternTopic("test*", StringCodec.INSTANCE);
int patternListenerId = patternTopic.addListener(new PatternMessageListener<String>() {
RPatternTopic patternTopic = redisson.getPatternTopic("test*", StringCodec.INSTANCE);
int patternListenerId = patternTopic.addListener(String.class, new PatternMessageListener<String>() {
@Override
public void onMessage(CharSequence pattern, CharSequence channel, String msg) {
assertThat(msg).isEqualTo("testmsg");

Loading…
Cancel
Save