PubSub handling

pull/243/head
Nikita 10 years ago
parent 6bb464e00f
commit 4783371ba4

@ -116,6 +116,8 @@ public class RedisClient {
System.out.println("out: " + m);
Future<PubSubStatusMessage> m1 = rpsc.psubscribe("ss*");
System.out.println("out: " + m1.get());
Future<PubSubStatusMessage> m2 = rpsc.psubscribe("ss*");
System.out.println("out: " + m2.get());
rpsc.addListener(new RedisPubSubListener<String>() {
@Override
public void onMessage(String channel, String message) {
@ -135,6 +137,12 @@ public class RedisClient {
Long res = c2.connect().sync(new StringCodec(), RedisCommands.PUBLISH, "sss", "4444");
System.out.println("published: " + res);
Future<PubSubStatusMessage> m3 = rpsc.punsubscribe("ss*");
System.out.println("punsubscribe out: " + m3.get());
final RedisClient c3 = new RedisClient("127.0.0.1", 6379);
Long res3 = c3.connect().sync(new StringCodec(), RedisCommands.PUBLISH, "sss", "4444");
System.out.println("published: " + res3);
/* Future<String> res = rc.execute(new StringCodec(), RedisCommands.SET, "test", "" + Math.random());

@ -81,6 +81,10 @@ public class RedisPubSubConnection {
return async(null, RedisCommands.UNSUBSCRIBE, channel);
}
public Future<PubSubStatusMessage> punsubscribe(String ... channel) {
return async(null, RedisCommands.PUNSUBSCRIBE, channel);
}
// public <T, R> Future<R> async(Codec encoder, RedisCommand<T> command, Object ... params) {
// Promise<R> promise = redisClient.getBootstrap().group().next().<R>newPromise();
// channel.writeAndFlush(new RedisData<T, R>(promise, encoder, command, params));

@ -18,7 +18,6 @@ package org.redisson.client.handler;
import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.client.protocol.Codec;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.pubsub.MultiDecoder;
@ -31,18 +30,18 @@ public class RedisData<T, R> {
final Object[] params;
final Codec codec;
final AtomicBoolean sended = new AtomicBoolean();
final MultiDecoder<Object> nextDecoder;
final MultiDecoder<Object> messageDecoder;
public RedisData(Promise<R> promise, Codec codec, RedisCommand<T> command, Object[] params) {
this(promise, null, codec, command, params);
}
public RedisData(Promise<R> promise, MultiDecoder<Object> nextDecoder, Codec codec, RedisCommand<T> command, Object[] params) {
public RedisData(Promise<R> promise, MultiDecoder<Object> messageDecoder, Codec codec, RedisCommand<T> command, Object[] params) {
this.promise = promise;
this.command = command;
this.params = params;
this.codec = codec;
this.nextDecoder = nextDecoder;
this.messageDecoder = messageDecoder;
}
public RedisCommand<T> getCommand() {
@ -53,8 +52,8 @@ public class RedisData<T, R> {
return params;
}
public MultiDecoder<Object> getNextDecoder() {
return nextDecoder;
public MultiDecoder<Object> getMessageDecoder() {
return messageDecoder;
}
public Promise<R> getPromise() {

@ -18,7 +18,9 @@ package org.redisson.client.handler;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.redisson.client.RedisException;
import org.redisson.client.RedisPubSubConnection;
@ -39,19 +41,29 @@ public class RedisDecoder extends ReplayingDecoder<Void> {
public static final char LF = '\n';
private static final char ZERO = '0';
private MultiDecoder<Object> nextDecoder;
private final Map<String, MultiDecoder<Object>> messageDecoders = new HashMap<String, MultiDecoder<Object>>();
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
RedisData<Object, Object> data = ctx.channel().attr(RedisCommandsQueue.REPLAY_PROMISE).getAndRemove();
RedisPubSubConnection pubSubConnection = ctx.channel().attr(RedisPubSubConnection.CONNECTION).get();
decode(in, data, null, pubSubConnection);
Decoder<Object> currentDecoder = null;
if (data == null) {
currentDecoder = new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf) {
return buf.toString(CharsetUtil.UTF_8);
}
};
}
decode(in, data, null, pubSubConnection, currentDecoder);
ctx.pipeline().fireUserEventTriggered(QueueCommands.NEXT_COMMAND);
}
private void decode(ByteBuf in, RedisData<Object, Object> data, List<Object> parts, RedisPubSubConnection pubSubConnection) throws IOException {
private void decode(ByteBuf in, RedisData<Object, Object> data, List<Object> parts, RedisPubSubConnection pubSubConnection, Decoder<Object> currentDecoder) throws IOException {
int code = in.readByte();
// System.out.println("trying decode -- " + (char)code);
if (code == '+') {
@ -66,19 +78,26 @@ public class RedisDecoder extends ReplayingDecoder<Void> {
Object result = Long.valueOf(status);
handleResult(data, parts, result);
} else if (code == '$') {
Object result = decoder(data, parts != null).decode(readBytes(in));
Object result = decoder(data, parts, currentDecoder).decode(readBytes(in));
handleResult(data, parts, result);
} else if (code == '*') {
long size = readLong(in);
List<Object> respParts = new ArrayList<Object>();
for (int i = 0; i < size; i++) {
decode(in, data, respParts, pubSubConnection);
decode(in, data, respParts, pubSubConnection, currentDecoder);
}
Object result = ((MultiDecoder<Object>)decoder(data, true)).decode(respParts);
Object result = messageDecoder(data, respParts).decode(respParts);
if (data != null) {
if (Arrays.asList("PSUBSCRIBE", "SUBSCRIBE").contains(data.getCommand().getName())) {
nextDecoder = data.getNextDecoder();
for (Object param : data.getParams()) {
messageDecoders.put(param.toString(), data.getMessageDecoder());
}
}
if (Arrays.asList("PUNSUBSCRIBE", "UNSUBSCRIBE").contains(data.getCommand().getName())) {
for (Object param : data.getParams()) {
messageDecoders.remove(param.toString());
}
}
data.getPromise().setSuccess(result);
} else {
@ -104,12 +123,35 @@ public class RedisDecoder extends ReplayingDecoder<Void> {
}
}
private Decoder<Object> decoder(RedisData<Object, Object> data, boolean isMulti) {
private MultiDecoder<Object> messageDecoder(RedisData<Object, Object> data, List<Object> parts) {
if (data == null) {
return nextDecoder;
if (parts.get(0).equals("message")) {
String channelName = (String) parts.get(1);
return messageDecoders.get(channelName);
}
if (parts.get(0).equals("pmessage")) {
String patternName = (String) parts.get(1);
return messageDecoders.get(patternName);
}
}
return data.getCommand().getReplayMultiDecoder();
}
private Decoder<Object> decoder(RedisData<Object, Object> data, List<Object> parts, Decoder<Object> currentDecoder) {
if (data == null) {
if (parts.size() == 2 && parts.get(0).equals("message")) {
String channelName = (String) parts.get(1);
return messageDecoders.get(channelName);
}
if (parts.size() == 3 && parts.get(0).equals("pmessage")) {
String patternName = (String) parts.get(1);
return messageDecoders.get(patternName);
}
return currentDecoder;
}
Decoder<Object> decoder = data.getCommand().getReplayDecoder();
if (isMulti) {
if (parts != null) {
decoder = data.getCommand().getReplayMultiDecoder();
}
if (decoder == null) {

Loading…
Cancel
Save