Multiple channels pubsub fixed

pull/243/head
Nikita 10 years ago
parent 61a0efef32
commit c13edde71c

@ -16,6 +16,7 @@
package org.redisson;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage.Type;
import org.redisson.core.MessageListener;
/**
@ -78,4 +79,8 @@ public class RedisPubSubTopicListenerWrapper<V> implements RedisPubSubListener<V
listener.onMessage(message);
}
@Override
public void onStatus(Type type, String channel) {
}
}

@ -20,14 +20,14 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.protocol.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage.Type;
import org.redisson.core.RCountDownLatch;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;
@ -72,7 +72,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
return oldPromise;
}
RedisPubSubListener<Integer> listener = new RedisPubSubListener<Integer>() {
RedisPubSubListener<Integer> listener = new BaseRedisPubSubListener<Integer>() {
@Override
public void onMessage(String channel, Integer message) {
@ -88,25 +88,17 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
}
@Override
public void onPatternMessage(String pattern, String channel, Integer message) {
// TODO Auto-generated method stub
public void onStatus(Type type, String channel) {
if (channel.equals(getChannelName()) && !value.getPromise().isSuccess()) {
value.getPromise().setSuccess(true);
}
}
};
Future<PubSubStatusMessage> res = null;
synchronized (ENTRIES) {
res = commandExecutor.getConnectionManager().subscribe(listener, getChannelName());
commandExecutor.getConnectionManager().subscribe(listener, getChannelName());
}
res.addListener(new FutureListener<PubSubStatusMessage>() {
@Override
public void operationComplete(Future<PubSubStatusMessage> future) throws Exception {
if (!value.getPromise().isSuccess()) {
value.getPromise().setSuccess(true);
}
}
});
return newPromise;
}

@ -21,15 +21,15 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage.Type;
import org.redisson.core.RLock;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;
@ -119,7 +119,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
return oldPromise;
}
RedisPubSubListener<Integer> listener = new RedisPubSubListener<Integer>() {
RedisPubSubListener<Integer> listener = new BaseRedisPubSubListener<Integer>() {
@Override
public void onMessage(String channel, Integer message) {
@ -129,23 +129,17 @@ public class RedissonLock extends RedissonExpirable implements RLock {
}
@Override
public void onPatternMessage(String pattern, String channel, Integer message) {
public void onStatus(Type type, String channel) {
if (channel.equals(getChannelName()) && !value.getPromise().isSuccess()) {
value.getPromise().setSuccess(true);
}
}
};
Future<PubSubStatusMessage> res = null;
synchronized (ENTRIES) {
res = commandExecutor.getConnectionManager().subscribe(listener, getChannelName());
commandExecutor.getConnectionManager().subscribe(listener, getChannelName());
}
res.addListener(new FutureListener<PubSubStatusMessage>() {
@Override
public void operationComplete(Future<PubSubStatusMessage> future) throws Exception {
if (!value.getPromise().isSuccess()) {
value.getPromise().setSuccess(true);
}
}
});
return newPromise;
}

@ -0,0 +1,34 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage.Type;
public class BaseRedisPubSubListener<V> implements RedisPubSubListener<V> {
@Override
public void onStatus(Type type, String channel) {
}
@Override
public void onMessage(String channel, V message) {
}
@Override
public void onPatternMessage(String pattern, String channel, V message) {
}
}

@ -0,0 +1,45 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage.Type;
public class OnceRedisPubSubListener<V> implements RedisPubSubListener<V> {
private RedisPubSubConnection connection;
private RedisPubSubListener<V> listener;
public OnceRedisPubSubListener(RedisPubSubConnection connection, RedisPubSubListener<V> listener) {
super();
this.connection = connection;
this.listener = listener;
}
@Override
public void onStatus(Type type, String channel) {
listener.onStatus(type, channel);
connection.removeListener(this);
}
@Override
public void onMessage(String channel, V message) {
}
@Override
public void onPatternMessage(String pattern, String channel, V message) {
}
}

@ -16,15 +16,12 @@
package org.redisson.client;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutionException;
import org.redisson.client.handler.CommandDecoder;
import org.redisson.client.handler.CommandEncoder;
import org.redisson.client.handler.CommandsListEncoder;
import org.redisson.client.handler.CommandsQueue;
import org.redisson.client.handler.ConnectionWatchdog;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.StringCodec;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
@ -110,65 +107,5 @@ public class RedisClient {
return channels.close();
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
final RedisClient c = new RedisClient("127.0.0.1", 6379);
Object r = c.connect().sync(new StringCodec(), RedisCommands.GET, "test1");
System.out.println(r);
// final RedisClient c = new RedisClient("127.0.0.1", 26379);
// RedisConnection rc = c.connect();
// List<String> res4 = rc.sync(RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, "mymaster");
// System.out.println("r: " + res4);
//
// List<Map<String, String>> res5 = rc.sync(RedisCommands.SENTINEL_SLAVES, "mymaster");
// System.out.println("r: " + res5);
/* RedisPubSubConnection rpsc = c.connectPubSub();
rc.sync(new StringCodec(), RedisCommands.HMSET, "test", "1", "2");
rc.sync(new StringCodec(), RedisCommands.HMSET, "test", "2", "3");
List<String> r = rc.sync(new StringCodec(), RedisCommands.HMGET, "test", "1", "2");
String res1 = rc.sync(RedisCommands.CLIENT_SETNAME, "12333");
System.out.println("res 12: " + res1);
String res2 = rc.sync(RedisCommands.CLIENT_GETNAME);
System.out.println("res name: " + res2);
// Boolean res3 = rc.sync(new StringCodec(), RedisCommands.EXISTS, "33");
// System.out.println("res name 2: " + res3);
Long m = rc.sync(new StringCodec(), RedisCommands.PUBLISH, "sss", "123");
System.out.println("out: " + m);
Future<PubSubStatusMessage> m1 = rpsc.psubscribe("ss*");
System.out.println("out: " + m1.get());
Future<PubSubStatusMessage> m2 = rpsc.psubscribe("ss*");
System.out.println("out: " + m2.get());
rpsc.addListener(new RedisPubSubListener<String>() {
@Override
public void onMessage(String channel, String message) {
System.out.println("incoming message: " + message);
}
@Override
public void onPatternMessage(String pattern, String channel, String message) {
System.out.println("incoming pattern pattern: " + pattern
+ " channel: " + channel + " message: " + message);
}
});
final RedisClient c2 = new RedisClient("127.0.0.1", 6379);
Long res = c2.connect().sync(new StringCodec(), RedisCommands.PUBLISH, "sss", "4444");
System.out.println("published: " + res);
Future<PubSubStatusMessage> m3 = rpsc.punsubscribe("ss*");
System.out.println("punsubscribe out: " + m3.get());
final RedisClient c3 = new RedisClient("127.0.0.1", 6379);
Long res3 = c3.connect().sync(new StringCodec(), RedisCommands.PUBLISH, "sss", "4444");
System.out.println("published: " + res3);
*/ }
}

@ -15,7 +15,7 @@
*/
package org.redisson.client;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.redisson.client.protocol.Codec;
@ -30,12 +30,10 @@ import org.redisson.client.protocol.pubsub.PubSubPatternMessageDecoder;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import io.netty.channel.Channel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
public class RedisPubSubConnection extends RedisConnection {
final ConcurrentLinkedQueue<RedisPubSubListener<Object>> listeners = new ConcurrentLinkedQueue<RedisPubSubListener<Object>>();
final Queue<RedisPubSubListener<Object>> listeners = new ConcurrentLinkedQueue<RedisPubSubListener<Object>>();
public RedisPubSubConnection(RedisClient redisClient, Channel channel) {
super(redisClient, channel);
@ -45,10 +43,20 @@ public class RedisPubSubConnection extends RedisConnection {
listeners.add(listener);
}
public void addOneShotListener(RedisPubSubListener listener) {
listeners.add(new OnceRedisPubSubListener<Object>(this, listener));
}
public void removeListener(RedisPubSubListener<?> listener) {
listeners.remove(listener);
}
public void onMessage(PubSubStatusMessage message) {
for (RedisPubSubListener<Object> redisPubSubListener : listeners) {
redisPubSubListener.onStatus(message.getType(), message.getChannel());
}
}
public void onMessage(PubSubMessage message) {
for (RedisPubSubListener<Object> redisPubSubListener : listeners) {
redisPubSubListener.onMessage(message.getChannel(), message.getValue());
@ -61,26 +69,24 @@ public class RedisPubSubConnection extends RedisConnection {
}
}
public Future<List<PubSubStatusMessage>> subscribe(Codec codec, String ... channel) {
return async(new PubSubMessageDecoder(codec.getValueDecoder()), RedisCommands.SUBSCRIBE, channel);
public void subscribe(Codec codec, String ... channel) {
async(new PubSubMessageDecoder(codec.getValueDecoder()), RedisCommands.SUBSCRIBE, channel);
}
public Future<List<PubSubStatusMessage>> psubscribe(Codec codec, String ... channel) {
return async(new PubSubPatternMessageDecoder(codec.getValueDecoder()), RedisCommands.PSUBSCRIBE, channel);
public void psubscribe(Codec codec, String ... channel) {
async(new PubSubPatternMessageDecoder(codec.getValueDecoder()), RedisCommands.PSUBSCRIBE, channel);
}
public Future<List<PubSubStatusMessage>> unsubscribe(String ... channel) {
return async((MultiDecoder)null, RedisCommands.UNSUBSCRIBE, channel);
public void unsubscribe(String ... channel) {
async((MultiDecoder)null, RedisCommands.UNSUBSCRIBE, channel);
}
public Future<List<PubSubStatusMessage>> punsubscribe(String ... channel) {
return async((MultiDecoder)null, RedisCommands.PUNSUBSCRIBE, channel);
public void punsubscribe(String ... channel) {
async((MultiDecoder)null, RedisCommands.PUNSUBSCRIBE, channel);
}
public <T, R> Future<R> async(MultiDecoder<Object> messageDecoder, RedisCommand<T> command, Object ... params) {
Promise<R> promise = redisClient.getBootstrap().group().next().<R>newPromise();
channel.writeAndFlush(new CommandData<T, R>(promise, messageDecoder, null, command, params));
return promise;
private <T, R> void async(MultiDecoder<Object> messageDecoder, RedisCommand<T> command, Object ... params) {
channel.writeAndFlush(new CommandData<T, R>(null, messageDecoder, null, command, params));
}
}

@ -15,8 +15,12 @@
*/
package org.redisson.client;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
public interface RedisPubSubListener<V> {
void onStatus(PubSubStatusMessage.Type type, String channel);
void onMessage(String channel, V message);
void onPatternMessage(String pattern, String channel, V message);

@ -205,28 +205,25 @@ public class CommandDecoder extends ReplayingDecoder<State> {
if (data == null) {
if (result instanceof PubSubStatusMessage) {
String channelName = ((PubSubStatusMessage) result).getChannel();
data = channels.get(channelName);
}
}
if (data != null) {
if (Arrays.asList("PSUBSCRIBE", "SUBSCRIBE").contains(data.getCommand().getName())) {
for (Object param : data.getParams()) {
channels.remove(param.toString());
messageDecoders.put(param.toString(), data.getMessageDecoder());
CommandData<Object, Object> d = channels.get(channelName);
if (Arrays.asList("PSUBSCRIBE", "SUBSCRIBE").contains(d.getCommand().getName())) {
channels.remove(channelName);
messageDecoders.put(channelName, d.getMessageDecoder());
}
}
if (Arrays.asList("PUNSUBSCRIBE", "UNSUBSCRIBE").contains(data.getCommand().getName())) {
for (Object param : data.getParams()) {
channels.remove(param.toString());
messageDecoders.remove(param.toString());
if (Arrays.asList("PUNSUBSCRIBE", "UNSUBSCRIBE").contains(d.getCommand().getName())) {
channels.remove(channelName);
messageDecoders.remove(channelName);
}
}
}
if (data != null) {
handleResult(data, parts, result, true);
} else {
RedisPubSubConnection pubSubConnection = (RedisPubSubConnection)channel.attr(RedisPubSubConnection.CONNECTION).get();
if (result instanceof PubSubMessage) {
if (result instanceof PubSubStatusMessage) {
pubSubConnection.onMessage((PubSubStatusMessage) result);
} else if (result instanceof PubSubMessage) {
pubSubConnection.onMessage((PubSubMessage) result);
} else {
pubSubConnection.onMessage((PubSubPatternMessage) result);

@ -72,7 +72,9 @@ public class CommandsQueue extends ChannelDuplexHandler {
List<CommandData<Object, Object>> pubSubOps = data.getPubSubOperations();
if (!pubSubOps.isEmpty()) {
for (CommandData<Object, Object> cd : pubSubOps) {
ctx.pipeline().get(CommandDecoder.class).addChannel((String)cd.getParams()[0], cd);
for (Object channel : cd.getParams()) {
ctx.pipeline().get(CommandDecoder.class).addChannel(channel.toString(), cd);
}
}
} else {
ctx.channel().attr(REPLAY).set(data);

@ -78,11 +78,11 @@ public interface ConnectionManager {
PubSubConnectionEntry psubscribe(String pattern);
<V> Future<PubSubStatusMessage> subscribe(RedisPubSubListener<V> listener, String channelName);
<V> void subscribe(RedisPubSubListener<V> listener, String channelName);
Future unsubscribe(String channelName);
void unsubscribe(String channelName);
Future punsubscribe(String channelName);
void punsubscribe(String channelName);
void shutdown();

@ -15,10 +15,8 @@
*/
package org.redisson.connection;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Set;
@ -29,12 +27,12 @@ import java.util.concurrent.TimeUnit;
import org.redisson.Config;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.protocol.Codec;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage.Type;
import org.redisson.misc.InfinitySemaphoreLatch;
import org.slf4j.Logger;
@ -49,7 +47,6 @@ import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
@ -282,10 +279,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
@Override
public Future<List<PubSubStatusMessage>> subscribe(RedisPubSubListener listener, String channelName) {
public void subscribe(RedisPubSubListener listener, String channelName) {
PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
if (сonnEntry != null) {
return сonnEntry.subscribe(codec, listener, channelName);
сonnEntry.subscribe(codec, listener, channelName);
return;
}
Set<PubSubConnectionEntry> entries = new HashSet<PubSubConnectionEntry>(name2PubSubConnection.values());
@ -294,14 +292,16 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
entry.release();
return group.next().newSucceededFuture(Arrays.asList(new PubSubStatusMessage(Type.SUBSCRIBE, channelName)));
return;
}
synchronized (entry) {
if (!entry.isActive()) {
entry.release();
return subscribe(listener, channelName);
subscribe(listener, channelName);
return;
}
return entry.subscribe(codec, listener, channelName);
entry.subscribe(codec, listener, channelName);
return;
}
}
}
@ -314,57 +314,63 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
returnSubscribeConnection(slot, entry);
return group.next().newSucceededFuture(Arrays.asList(new PubSubStatusMessage(Type.SUBSCRIBE, channelName)));
return;
}
synchronized (entry) {
if (!entry.isActive()) {
entry.release();
return subscribe(listener, channelName);
subscribe(listener, channelName);
return;
}
return entry.subscribe(codec, listener, channelName);
entry.subscribe(codec, listener, channelName);
return;
}
}
@Override
public Future<List<PubSubStatusMessage>> unsubscribe(String channelName) {
public void unsubscribe(final String channelName) {
final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null) {
return group.next().newSucceededFuture(null);
return;
}
Future<List<PubSubStatusMessage>> future = entry.unsubscribe(channelName);
future.addListener(new FutureListener<List<PubSubStatusMessage>>() {
entry.unsubscribe(channelName, new BaseRedisPubSubListener() {
@Override
public void operationComplete(Future<List<PubSubStatusMessage>> future) throws Exception {
synchronized (entry) {
if (entry.tryClose()) {
returnSubscribeConnection(-1, entry);
public void onStatus(Type type, String channel) {
if (type == Type.UNSUBSCRIBE && channel.equals(channelName)) {
synchronized (entry) {
if (entry.tryClose()) {
returnSubscribeConnection(-1, entry);
}
}
}
}
});
return future;
}
@Override
public Future<List<PubSubStatusMessage>> punsubscribe(String channelName) {
public void punsubscribe(final String channelName) {
final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null) {
return group.next().newSucceededFuture(null);
return;
}
Future<List<PubSubStatusMessage>> future = entry.punsubscribe(channelName);
future.addListener(new FutureListener<List<PubSubStatusMessage>>() {
entry.punsubscribe(channelName, new BaseRedisPubSubListener() {
@Override
public void operationComplete(Future<List<PubSubStatusMessage>> future) throws Exception {
synchronized (entry) {
if (entry.tryClose()) {
returnSubscribeConnection(-1, entry);
public void onStatus(Type type, String channel) {
if (type == Type.PUNSUBSCRIBE && channel.equals(channelName)) {
synchronized (entry) {
if (entry.tryClose()) {
returnSubscribeConnection(-1, entry);
}
}
}
}
});
return future;
}
protected MasterSlaveEntry getEntry() {

@ -15,23 +15,20 @@
*/
package org.redisson.connection;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.protocol.Codec;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -125,66 +122,54 @@ public class PubSubConnectionEntry {
}
public void subscribe(Codec codec, final String channelName) {
Future<List<PubSubStatusMessage>> result = conn.subscribe(codec, channelName);
result.addListener(new FutureListener<List<PubSubStatusMessage>>() {
@Override
public void operationComplete(Future<List<PubSubStatusMessage>> future) throws Exception {
if (future.isSuccess()) {
log.debug("subscribed to '{}' channel on server '{}'", channelName, conn.getRedisClient().getAddr());
}
}
});
conn.subscribe(codec, channelName);
}
public void psubscribe(Codec codec, final String pattern) {
Future<List<PubSubStatusMessage>> result = conn.psubscribe(codec, pattern);
result.addListener(new FutureListener<List<PubSubStatusMessage>>() {
@Override
public void operationComplete(Future<List<PubSubStatusMessage>> future) throws Exception {
log.debug("punsubscribed from '{}' pattern on server '{}'", pattern, conn.getRedisClient().getAddr());
}
});
conn.psubscribe(codec, pattern);
}
public Future<List<PubSubStatusMessage>> subscribe(Codec codec, RedisPubSubListener listener, String channel) {
public void subscribe(Codec codec, RedisPubSubListener listener, String channel) {
addListener(channel, listener);
return conn.subscribe(codec, channel);
conn.subscribe(codec, channel);
}
public Future<List<PubSubStatusMessage>> unsubscribe(final String channel) {
Queue<RedisPubSubListener> listeners = channelListeners.get(channel);
if (listeners != null) {
for (RedisPubSubListener listener : listeners) {
removeListener(channel, listener);
}
}
Future<List<PubSubStatusMessage>> future = conn.unsubscribe(channel);
future.addListener(new FutureListener<List<PubSubStatusMessage>>() {
public void unsubscribe(final String channel, RedisPubSubListener listener) {
conn.addOneShotListener(new BaseRedisPubSubListener<Object>() {
@Override
public void operationComplete(Future<List<PubSubStatusMessage>> future) throws Exception {
subscribedChannelsAmount.release();
public void onStatus(Type type, String ch) {
if (type == Type.UNSUBSCRIBE && channel.equals(ch)) {
Queue<RedisPubSubListener> listeners = channelListeners.get(channel);
if (listeners != null) {
for (RedisPubSubListener listener : listeners) {
removeListener(channel, listener);
}
}
subscribedChannelsAmount.release();
}
}
});
return future;
conn.addOneShotListener(listener);
conn.unsubscribe(channel);
}
public Future<List<PubSubStatusMessage>> punsubscribe(final String channel) {
Queue<RedisPubSubListener> listeners = channelListeners.get(channel);
if (listeners != null) {
for (RedisPubSubListener listener : listeners) {
removeListener(channel, listener);
}
}
Future<List<PubSubStatusMessage>> future = conn.punsubscribe(channel);
future.addListener(new FutureListener<List<PubSubStatusMessage>>() {
public void punsubscribe(final String channel, RedisPubSubListener listener) {
conn.addOneShotListener(new BaseRedisPubSubListener<Object>() {
@Override
public void operationComplete(Future<List<PubSubStatusMessage>> future) throws Exception {
subscribedChannelsAmount.release();
public void onStatus(Type type, String ch) {
if (type == Type.PUNSUBSCRIBE && channel.equals(ch)) {
Queue<RedisPubSubListener> listeners = channelListeners.get(channel);
if (listeners != null) {
for (RedisPubSubListener listener : listeners) {
removeListener(channel, listener);
}
}
subscribedChannelsAmount.release();
}
}
});
return future;
conn.addOneShotListener(listener);
conn.punsubscribe(channel);
}

@ -34,6 +34,7 @@ import org.redisson.client.RedisPubSubListener;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.StringCodec;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -130,15 +131,16 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
@Override
public void onPatternMessage(String pattern, String channel, String message) {
}
});
Future<List<PubSubStatusMessage>> res = pubsub.subscribe(StringCodec.INSTANCE, "+switch-master", "+sdown", "-sdown", "+slave");
res.addListener(new FutureListener<List<PubSubStatusMessage>>() {
@Override
public void operationComplete(Future<List<PubSubStatusMessage>> future) throws Exception {
log.info("subscribed to channels: {} from Sentinel {}:{}", future.getNow(), addr.getHost(), addr.getPort());
public void onStatus(Type type, String channel) {
if (type == Type.SUBSCRIBE) {
log.info("subscribed to channel: {} from Sentinel {}:{}", channel, addr.getHost(), addr.getPort());
}
}
});
pubsub.subscribe(StringCodec.INSTANCE, "+switch-master", "+sdown", "-sdown", "+slave");
}
}

@ -1,8 +1,10 @@
package org.redisson;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -12,16 +14,48 @@ import org.junit.Assert;
import org.junit.Test;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.StringCodec;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage.Type;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
public class RedisClientTest {
@Test
public void testSubscribe() throws InterruptedException {
RedisClient c = new RedisClient("localhost", 6379);
RedisPubSubConnection pubSubConnection = c.connectPubSub();
final CountDownLatch latch = new CountDownLatch(2);
pubSubConnection.addListener(new RedisPubSubListener<Object>() {
@Override
public void onStatus(Type type, String channel) {
Assert.assertEquals(Type.SUBSCRIBE, type);
Assert.assertTrue(Arrays.asList("test1", "test2").contains(channel));
latch.countDown();
}
@Override
public void onMessage(String channel, Object message) {
}
@Override
public void onPatternMessage(String pattern, String channel, Object message) {
}
});
pubSubConnection.subscribe(StringCodec.INSTANCE, "test1", "test2");
latch.await();
}
@Test
public void test() throws InterruptedException {
RedisClient c = new RedisClient("localhost", 6379);

Loading…
Cancel
Save