pubsub fixed. #33

pull/38/head
Nikita 11 years ago
parent eb38da41a8
commit 97eb7771ff

@ -38,9 +38,13 @@ public class PubSubCommandHandler<K, V> extends CommandHandler<K, V> {
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException {
while (output.type() == null && !queue.isEmpty()) {
CommandOutput<K, V, ?> output = queue.peek().getOutput();
if (!rsm.decode(buffer, output)) return;
if (!rsm.decode(buffer, output)) {
return;
}
queue.take().complete();
if (output instanceof PubSubOutput) ctx.fireChannelRead(output);
if (output instanceof PubSubOutput && ((PubSubOutput) output).type() != null) {
ctx.fireChannelRead(output);
}
}
while (rsm.decode(buffer, output)) {

@ -26,7 +26,7 @@ import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
* @param <K>
* @param <V>
*/
public class RedisPubSubTopicListenerWrapper<V> extends RedisPubSubAdapter<String, V> {
public class RedisPubSubTopicListenerWrapper<K, V> extends RedisPubSubAdapter<K, V> {
private final MessageListener<V> listener;
private final String name;
@ -42,7 +42,7 @@ public class RedisPubSubTopicListenerWrapper<V> extends RedisPubSubAdapter<Strin
}
@Override
public void message(String channel, V message) {
public void message(K channel, V message) {
// could be subscribed to multiple channels
if (name.equals(channel)) {
listener.onMessage(message);

@ -50,7 +50,7 @@ public class RedissonTopic<M> extends RedissonObject implements RTopic<M> {
@Override
public int addListener(MessageListener<M> listener) {
RedisPubSubTopicListenerWrapper<M> pubSubListener = new RedisPubSubTopicListenerWrapper<M>(listener, getName());
RedisPubSubTopicListenerWrapper<String, M> pubSubListener = new RedisPubSubTopicListenerWrapper<String, M>(listener, getName());
PubSubConnectionEntry entry = connectionManager.subscribe(getName());
synchronized (entry) {
entry.addListener(pubSubListener);

@ -40,7 +40,7 @@ public class SingleConnectionConfig extends BaseConfig<SingleConnectionConfig> {
SingleConnectionConfig() {
}
public SingleConnectionConfig(SingleConnectionConfig config) {
SingleConnectionConfig(SingleConnectionConfig config) {
super(config);
setAddress(config.getAddress());
setConnectionPoolSize(config.getConnectionPoolSize());

@ -61,7 +61,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
private LoadBalancer balancer;
private final List<RedisClient> slaveClients = new ArrayList<RedisClient>();
protected RedisClient masterClient;
protected volatile RedisClient masterClient;
private Semaphore masterConnectionsSemaphore;
@ -95,6 +95,18 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
masterConnectionsSemaphore = new Semaphore(this.config.getMasterConnectionPoolSize());
}
public void changeMaster(String host, int port) {
// TODO async
masterClient.shutdown();
masterClient = new RedisClient(group, host, port);
// TODO
// 1. remove slave
// 2. re-attach listeners
// 3. remove dead slave
}
@Override
public <T> FutureListener<T> createReleaseWriteListener(final RedisConnection conn) {
return new FutureListener<T>() {
@Override
@ -104,6 +116,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
};
}
@Override
public <T> FutureListener<T> createReleaseReadListener(final RedisConnection conn) {
return new FutureListener<T>() {
@Override
@ -113,6 +126,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
};
}
@Override
public <K, V> RedisConnection<K, V> connectionWriteOp() {
acquireMasterConnection();
@ -126,15 +140,19 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return conn;
}
@Override
public <K, V> RedisConnection<K, V> connectionReadOp() {
return balancer.nextConnection();
}
@Override
public PubSubConnectionEntry getEntry(String channelName) {
return name2PubSubConnection.get(channelName);
}
@Override
public <K, V> PubSubConnectionEntry subscribe(String channelName) {
// multiple channel names per PubSubConnections allowed
PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
if (сonnEntry != null) {
return сonnEntry;
@ -158,6 +176,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
entry.tryAcquire();
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
returnSubscribeConnection(entry);
return oldEntry;
}
entry.subscribe(channelName);
@ -169,6 +188,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return balancer.nextPubSubConnection();
}
@Override
public <K, V> PubSubConnectionEntry subscribe(RedisPubSubAdapter<K, V> listener, String channelName) {
PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
if (сonnEntry != null) {
@ -193,6 +213,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
entry.tryAcquire();
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
returnSubscribeConnection(entry);
return oldEntry;
}
entry.subscribe(listener, channelName);
@ -214,13 +235,13 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
masterConnectionsSemaphore.release();
}
@Override
public void unsubscribe(PubSubConnectionEntry entry, String channelName) {
if (entry.hasListeners(channelName)) {
return;
}
name2PubSubConnection.remove(channelName);
entry.unsubscribe(channelName);
log.debug("unsubscribed from '{}' channel", channelName);
if (entry.tryClose()) {
pubSubConnections.remove(entry);
returnSubscribeConnection(entry);
@ -240,6 +261,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
balancer.returnConnection(сonnection);
}
@Override
public void shutdown() {
masterClient.shutdown();
for (RedisClient client : slaveClients) {
@ -247,6 +269,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
}
@Override
public EventLoopGroup getGroup() {
return group;
}

@ -30,7 +30,7 @@ public class PubSubConnectionEntry {
private final Logger log = LoggerFactory.getLogger(getClass());
private final Semaphore semaphore;
private final Semaphore subscribedChannelsAmount;
private final RedisPubSubConnection conn;
private final int subscriptionsPerConnection;
@ -38,7 +38,7 @@ public class PubSubConnectionEntry {
super();
this.conn = conn;
this.subscriptionsPerConnection = subscriptionsPerConnection;
this.semaphore = new Semaphore(subscriptionsPerConnection);
this.subscribedChannelsAmount = new Semaphore(subscriptionsPerConnection);
}
public void addListener(RedisPubSubListener listener) {
@ -82,18 +82,24 @@ public class PubSubConnectionEntry {
}
public boolean tryAcquire() {
return semaphore.tryAcquire();
return subscribedChannelsAmount.tryAcquire();
}
public void release() {
semaphore.release();
subscribedChannelsAmount.release();
}
public void subscribe(final String channelName) {
conn.addListener(new RedisPubSubAdapter() {
public void subscribed(String channel, long count) {
@Override
public void subscribed(Object channel, long count) {
log.debug("subscribed to '{}' channel", channelName);
}
@Override
public void unsubscribed(Object channel, long count) {
log.debug("unsubscribed from '{}' channel", channelName);
}
});
conn.subscribe(channelName);
}
@ -106,11 +112,11 @@ public class PubSubConnectionEntry {
public void unsubscribe(String channel) {
conn.unsubscribe(channel);
semaphore.release();
subscribedChannelsAmount.release();
}
public boolean tryClose() {
if (semaphore.tryAcquire(subscriptionsPerConnection)) {
if (subscribedChannelsAmount.tryAcquire(subscriptionsPerConnection)) {
conn.close();
return true;
}

@ -0,0 +1,28 @@
package org.redisson;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.core.RBucket;
public class RedissonBucketTest extends BaseTest {
@Test
public void testSetGet() {
RBucket<String> bucket = redisson.getBucket("test");
Assert.assertNull(bucket.get());
String value = "somevalue";
bucket.set(value);
Assert.assertEquals(value, bucket.get());
}
@Test
public void testSetDelete() {
RBucket<String> bucket = redisson.getBucket("test");
String value = "somevalue";
bucket.set(value);
Assert.assertEquals(value, bucket.get());
bucket.delete();
Assert.assertNull(bucket.get());
}
}

@ -43,6 +43,40 @@ public class RedissonTopicTest {
}
@Test
public void testLazyUnsubscribe() throws InterruptedException {
final CountDownLatch messageRecieved = new CountDownLatch(1);
Redisson redisson1 = Redisson.create();
RTopic<Message> topic1 = redisson1.getTopic("topic");
int listenerId = topic1.addListener(new MessageListener<Message>() {
@Override
public void onMessage(Message msg) {
Assert.fail();
}
});
Thread.sleep(1000);
topic1.removeListener(listenerId);
Thread.sleep(1000);
Redisson redisson2 = Redisson.create();
RTopic<Message> topic2 = redisson2.getTopic("topic");
topic2.addListener(new MessageListener<Message>() {
@Override
public void onMessage(Message msg) {
Assert.assertEquals(new Message("123"), msg);
messageRecieved.countDown();
}
});
topic2.publish(new Message("123"));
messageRecieved.await();
redisson1.shutdown();
redisson2.shutdown();
}
@Test
public void test() throws InterruptedException {
final CountDownLatch messageRecieved = new CountDownLatch(2);

Loading…
Cancel
Save