Merge pull request #77 from alex-sherwin/master

#76 - added support for pubsub wildcard topics
pull/179/head
Nikita Koksharov 10 years ago
commit 003bd0a185

@ -81,10 +81,6 @@ public class RedisPubSubConnection<K, V> extends RedisAsyncConnection<K, V> {
dispatch(PSUBSCRIBE, new PubSubOutput<K, V>(codec), args(patterns));
}
public void punsubscribe(String... patterns) {
dispatch(PUNSUBSCRIBE, new PubSubOutput<K, V>(codec), args(patterns));
}
public void subscribe(String... channels) {
dispatch(SUBSCRIBE, new PubSubOutput<K, V>(codec), args(channels));
}
@ -93,7 +89,12 @@ public class RedisPubSubConnection<K, V> extends RedisAsyncConnection<K, V> {
return dispatch(UNSUBSCRIBE, new PubSubOutput<K, V>(codec), args(channels));
}
@Override
public Future<V> punsubscribe(String... patterns) {
return dispatch(PUNSUBSCRIBE, new PubSubOutput<K, V>(codec), args(patterns));
}
@Override
public synchronized void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);

@ -49,6 +49,11 @@ public class RedisPubSubTopicListenerWrapper<V> extends RedisPubSubAdapter<V> {
}
}
@Override
public void message(String pattern, String channel, V message) {
listener.onMessage(message);
}
@Override
public int hashCode() {
final int prime = 31;

@ -71,6 +71,44 @@ public class RedissonTopic<M> extends RedissonObject implements RTopic<M> {
return addListener(pubSubListener);
}
@Override
public int addPListener(MessageListener<M> listener) {
RedisPubSubTopicListenerWrapper<M> pubSubListener = new RedisPubSubTopicListenerWrapper<M>(listener, getName());
return addPListener(pubSubListener);
}
private int addPListener(RedisPubSubTopicListenerWrapper<M> pubSubListener) {
PubSubConnectionEntry entry = connectionManager.psubscribe(getName());
synchronized (entry) {
if (entry.isActive()) {
entry.addListener(getName(), pubSubListener);
return pubSubListener.hashCode();
}
}
// entry is inactive trying add again
return addPListener(pubSubListener);
}
@Override
public void removePListener(int listenerId) {
PubSubConnectionEntry entry = connectionManager.getEntry(getName());
if (entry == null) {
return;
}
synchronized (entry) {
if (entry.isActive()) {
entry.removeListener(getName(), listenerId);
if (entry.getListeners(getName()).isEmpty()) {
connectionManager.punsubscribe(getName());
}
return;
}
}
// entry is inactive trying add again
removePListener(listenerId);
}
@Override
public void removeListener(int listenerId) {
PubSubConnectionEntry entry = connectionManager.getEntry(getName());

@ -76,10 +76,14 @@ public interface ConnectionManager {
<K, V> PubSubConnectionEntry subscribe(String channelName);
<K, V> PubSubConnectionEntry psubscribe(String pattern);
<K, V> PubSubConnectionEntry subscribe(RedisPubSubAdapter<V> listener, String channelName);
Future unsubscribe(String channelName);
Future punsubscribe(String channelName);
void releaseRead(int slot, RedisConnection сonnection);
void shutdown();

@ -571,6 +571,55 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
}
@Override
public <K, V> PubSubConnectionEntry psubscribe(String channelName) {
// multiple channel names per PubSubConnections allowed
PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
if (сonnEntry != null) {
return сonnEntry;
}
Set<PubSubConnectionEntry> entries = new HashSet<PubSubConnectionEntry>(name2PubSubConnection.values());
for (PubSubConnectionEntry entry : entries) {
if (entry.tryAcquire()) {
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
entry.release();
return oldEntry;
}
synchronized (entry) {
if (!entry.isActive()) {
entry.release();
return psubscribe(channelName);
}
entry.psubscribe(channelName);
return entry;
}
}
}
int slot = -1;
RedisPubSubConnection<K, V> conn = nextPubSubConnection(slot);
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
entry.tryAcquire();
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
returnSubscribeConnection(slot, entry);
return oldEntry;
}
synchronized (entry) {
if (!entry.isActive()) {
entry.release();
return psubscribe(channelName);
}
entry.psubscribe(channelName);
return entry;
}
}
@Override
public <K, V> PubSubConnectionEntry subscribe(RedisPubSubAdapter<V> listener, String channelName) {
PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
@ -639,6 +688,27 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return future;
}
@Override
public Future punsubscribe(String channelName) {
final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null) {
return group.next().newSucceededFuture(null);
}
Future future = entry.punsubscribe(channelName);
future.addListener(new FutureListener() {
@Override
public void operationComplete(Future future) throws Exception {
synchronized (entry) {
if (entry.tryClose()) {
returnSubscribeConnection(-1, entry);
}
}
}
});
return future;
}
protected MasterSlaveEntry getEntry() {
return getEntry(0);
}

@ -138,6 +138,21 @@ public class PubSubConnectionEntry {
conn.subscribe(channelName);
}
public void psubscribe(final String pattern) {
conn.addListener(new RedisPubSubAdapter() {
@Override
public void psubscribed(String channel, long count) {
log.debug("psubscribed to '{}' pattern", pattern);
}
@Override
public void punsubscribed(String channel, long count) {
log.debug("punsubscribed from '{}' pattern", pattern);
}
});
conn.psubscribe(pattern);
}
public void subscribe(RedisPubSubAdapter listener, String channel) {
addListener(channel, listener);
@ -162,6 +177,25 @@ public class PubSubConnectionEntry {
return future;
}
public Future punsubscribe(final String channel) {
Queue<RedisPubSubListener> listeners = channelListeners.get(channel);
if (listeners != null) {
for (RedisPubSubListener listener : listeners) {
removeListener(channel, listener);
}
}
Future future = conn.punsubscribe(channel);
future.addListener(new FutureListener() {
@Override
public void operationComplete(Future future) throws Exception {
subscribedChannelsAmount.release();
}
});
return future;
}
public boolean tryClose() {
if (subscribedChannelsAmount.tryAcquire(subscriptionsPerConnection)) {
close();

@ -53,6 +53,17 @@ public interface RTopic<M> extends RObject {
*/
int addListener(MessageListener<M> listener);
/**
* Subscribes to this topic using a pattern
* <code>MessageListener.onMessage</code> is called when any message
* is published on this topic.
*
* @param listener
* @return locally unique listener id
* @see org.redisson.core.MessageListener
*/
int addPListener(MessageListener<M> listener);
/**
* Removes the listener by <code>id</code> for listening this topic
*
@ -60,4 +71,11 @@ public interface RTopic<M> extends RObject {
*/
void removeListener(int listenerId);
/**
* Removes the pattern listener by <code>id</code> for listening this topic
*
* @param listenerId
*/
void removePListener(int listenerId);
}

@ -0,0 +1,182 @@
package org.redisson;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.core.MessageListener;
import org.redisson.core.RTopic;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class RedissonTopicPatternTest {
public static class Message {
private String name;
public Message() {
}
public Message(String name) {
this.name = name;
}
public String getName() {
return name;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Message other = (Message) obj;
if (name == null) {
if (other.name != null)
return false;
} else if (!name.equals(other.name))
return false;
return true;
}
@Override
public String toString() {
return "Message{" +
"name='" + name + '\'' +
'}';
}
}
@Test
public void testUnsubscribe() throws InterruptedException {
final CountDownLatch messageRecieved = new CountDownLatch(1);
Redisson redisson = Redisson.create();
RTopic<Message> topic1 = redisson.getTopic("topic1.*");
int listenerId = topic1.addPListener(new MessageListener<Message>() {
@Override
public void onMessage(Message msg) {
Assert.fail();
}
});
topic1.addPListener(new MessageListener<Message>() {
@Override
public void onMessage(Message msg) {
Assert.assertEquals(new Message("123"), msg);
messageRecieved.countDown();
}
});
topic1.removePListener(listenerId);
topic1 = redisson.getTopic("topic1.*");
topic1.publish(new Message("123"));
messageRecieved.await();
redisson.shutdown();
}
@Test
public void testLazyUnsubscribe() throws InterruptedException {
final CountDownLatch messageRecieved = new CountDownLatch(1);
Redisson redisson1 = Redisson.create();
RTopic<Message> topic1 = redisson1.getTopic("topic.*");
int listenerId = topic1.addPListener(new MessageListener<Message>() {
@Override
public void onMessage(Message msg) {
Assert.fail();
}
});
Thread.sleep(1000);
topic1.removePListener(listenerId);
Thread.sleep(1000);
Redisson redisson2 = Redisson.create();
RTopic<Message> topic2 = redisson2.getTopic("topic.*");
topic2.addPListener(new MessageListener<Message>() {
@Override
public void onMessage(Message msg) {
Assert.assertEquals(new Message("123"), msg);
messageRecieved.countDown();
}
});
RTopic<Message> topic3 = redisson2.getTopic("topic.t3");
topic3.publish(new Message("123"));
messageRecieved.await();
redisson1.shutdown();
redisson2.shutdown();
}
@Test
public void test() throws InterruptedException {
final CountDownLatch messageRecieved = new CountDownLatch(5);
Redisson redisson1 = Redisson.create();
RTopic<Message> topic1 = redisson1.getTopic("topic.*");
topic1.addPListener(new MessageListener<Message>() {
@Override
public void onMessage(Message msg) {
Assert.assertEquals(new Message("123"), msg);
messageRecieved.countDown();
}
});
Redisson redisson2 = Redisson.create();
RTopic<Message> topic2 = redisson2.getTopic("topic.t1");
topic2.addListener(new MessageListener<Message>() {
@Override
public void onMessage(Message msg) {
Assert.assertEquals(new Message("123"), msg);
messageRecieved.countDown();
}
});
topic2.publish(new Message("123"));
topic2.publish(new Message("123"));
RTopic<Message> topicz = redisson2.getTopic("topicz.t1");
topicz.publish(new Message("789")); // this message doesn't get delivered, and would fail the assertion
RTopic<Message> topict2 = redisson2.getTopic("topic.t2");
topict2.publish(new Message("123"));
Assert.assertTrue(messageRecieved.await(5, TimeUnit.SECONDS));
redisson1.shutdown();
redisson2.shutdown();
}
@Test
public void testListenerRemove() throws InterruptedException {
Redisson redisson1 = Redisson.create();
RTopic<Message> topic1 = redisson1.getTopic("topic.*");
int id = topic1.addPListener(new MessageListener<Message>() {
@Override
public void onMessage(Message msg) {
Assert.fail();
}
});
Redisson redisson2 = Redisson.create();
RTopic<Message> topic2 = redisson2.getTopic("topic.t1");
topic1.removePListener(id);
topic2.publish(new Message("123"));
Thread.sleep(1000);
redisson1.shutdown();
redisson2.shutdown();
}
}
Loading…
Cancel
Save