From 4783371ba4233aba77b8e189739a3ab4ffbc96ee Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 10 Jul 2015 12:34:48 +0300 Subject: [PATCH] PubSub handling --- .../java/org/redisson/client/RedisClient.java | 8 +++ .../client/RedisPubSubConnection.java | 4 ++ .../redisson/client/handler/RedisData.java | 11 ++-- .../redisson/client/handler/RedisDecoder.java | 62 ++++++++++++++++--- 4 files changed, 69 insertions(+), 16 deletions(-) diff --git a/src/main/java/org/redisson/client/RedisClient.java b/src/main/java/org/redisson/client/RedisClient.java index 637712197..47a55248a 100644 --- a/src/main/java/org/redisson/client/RedisClient.java +++ b/src/main/java/org/redisson/client/RedisClient.java @@ -116,6 +116,8 @@ public class RedisClient { System.out.println("out: " + m); Future m1 = rpsc.psubscribe("ss*"); System.out.println("out: " + m1.get()); + Future m2 = rpsc.psubscribe("ss*"); + System.out.println("out: " + m2.get()); rpsc.addListener(new RedisPubSubListener() { @Override public void onMessage(String channel, String message) { @@ -135,6 +137,12 @@ public class RedisClient { Long res = c2.connect().sync(new StringCodec(), RedisCommands.PUBLISH, "sss", "4444"); System.out.println("published: " + res); + Future m3 = rpsc.punsubscribe("ss*"); + System.out.println("punsubscribe out: " + m3.get()); + + final RedisClient c3 = new RedisClient("127.0.0.1", 6379); + Long res3 = c3.connect().sync(new StringCodec(), RedisCommands.PUBLISH, "sss", "4444"); + System.out.println("published: " + res3); /* Future res = rc.execute(new StringCodec(), RedisCommands.SET, "test", "" + Math.random()); diff --git a/src/main/java/org/redisson/client/RedisPubSubConnection.java b/src/main/java/org/redisson/client/RedisPubSubConnection.java index 126b2405c..fee152892 100644 --- a/src/main/java/org/redisson/client/RedisPubSubConnection.java +++ b/src/main/java/org/redisson/client/RedisPubSubConnection.java @@ -81,6 +81,10 @@ public class RedisPubSubConnection { return async(null, RedisCommands.UNSUBSCRIBE, channel); } + public Future punsubscribe(String ... channel) { + return async(null, RedisCommands.PUNSUBSCRIBE, channel); + } + // public Future async(Codec encoder, RedisCommand command, Object ... params) { // Promise promise = redisClient.getBootstrap().group().next().newPromise(); // channel.writeAndFlush(new RedisData(promise, encoder, command, params)); diff --git a/src/main/java/org/redisson/client/handler/RedisData.java b/src/main/java/org/redisson/client/handler/RedisData.java index ab38cca80..9c925ca78 100644 --- a/src/main/java/org/redisson/client/handler/RedisData.java +++ b/src/main/java/org/redisson/client/handler/RedisData.java @@ -18,7 +18,6 @@ package org.redisson.client.handler; import java.util.concurrent.atomic.AtomicBoolean; import org.redisson.client.protocol.Codec; -import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.pubsub.MultiDecoder; @@ -31,18 +30,18 @@ public class RedisData { final Object[] params; final Codec codec; final AtomicBoolean sended = new AtomicBoolean(); - final MultiDecoder nextDecoder; + final MultiDecoder messageDecoder; public RedisData(Promise promise, Codec codec, RedisCommand command, Object[] params) { this(promise, null, codec, command, params); } - public RedisData(Promise promise, MultiDecoder nextDecoder, Codec codec, RedisCommand command, Object[] params) { + public RedisData(Promise promise, MultiDecoder messageDecoder, Codec codec, RedisCommand command, Object[] params) { this.promise = promise; this.command = command; this.params = params; this.codec = codec; - this.nextDecoder = nextDecoder; + this.messageDecoder = messageDecoder; } public RedisCommand getCommand() { @@ -53,8 +52,8 @@ public class RedisData { return params; } - public MultiDecoder getNextDecoder() { - return nextDecoder; + public MultiDecoder getMessageDecoder() { + return messageDecoder; } public Promise getPromise() { diff --git a/src/main/java/org/redisson/client/handler/RedisDecoder.java b/src/main/java/org/redisson/client/handler/RedisDecoder.java index 8d9627e58..7ff23219b 100644 --- a/src/main/java/org/redisson/client/handler/RedisDecoder.java +++ b/src/main/java/org/redisson/client/handler/RedisDecoder.java @@ -18,7 +18,9 @@ package org.redisson.client.handler; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.redisson.client.RedisException; import org.redisson.client.RedisPubSubConnection; @@ -39,19 +41,29 @@ public class RedisDecoder extends ReplayingDecoder { public static final char LF = '\n'; private static final char ZERO = '0'; - private MultiDecoder nextDecoder; + private final Map> messageDecoders = new HashMap>(); @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { RedisData data = ctx.channel().attr(RedisCommandsQueue.REPLAY_PROMISE).getAndRemove(); RedisPubSubConnection pubSubConnection = ctx.channel().attr(RedisPubSubConnection.CONNECTION).get(); - decode(in, data, null, pubSubConnection); + Decoder currentDecoder = null; + if (data == null) { + currentDecoder = new Decoder() { + @Override + public Object decode(ByteBuf buf) { + return buf.toString(CharsetUtil.UTF_8); + } + }; + } + + decode(in, data, null, pubSubConnection, currentDecoder); ctx.pipeline().fireUserEventTriggered(QueueCommands.NEXT_COMMAND); } - private void decode(ByteBuf in, RedisData data, List parts, RedisPubSubConnection pubSubConnection) throws IOException { + private void decode(ByteBuf in, RedisData data, List parts, RedisPubSubConnection pubSubConnection, Decoder currentDecoder) throws IOException { int code = in.readByte(); // System.out.println("trying decode -- " + (char)code); if (code == '+') { @@ -66,19 +78,26 @@ public class RedisDecoder extends ReplayingDecoder { Object result = Long.valueOf(status); handleResult(data, parts, result); } else if (code == '$') { - Object result = decoder(data, parts != null).decode(readBytes(in)); + Object result = decoder(data, parts, currentDecoder).decode(readBytes(in)); handleResult(data, parts, result); } else if (code == '*') { long size = readLong(in); List respParts = new ArrayList(); for (int i = 0; i < size; i++) { - decode(in, data, respParts, pubSubConnection); + decode(in, data, respParts, pubSubConnection, currentDecoder); } - Object result = ((MultiDecoder)decoder(data, true)).decode(respParts); + Object result = messageDecoder(data, respParts).decode(respParts); if (data != null) { if (Arrays.asList("PSUBSCRIBE", "SUBSCRIBE").contains(data.getCommand().getName())) { - nextDecoder = data.getNextDecoder(); + for (Object param : data.getParams()) { + messageDecoders.put(param.toString(), data.getMessageDecoder()); + } + } + if (Arrays.asList("PUNSUBSCRIBE", "UNSUBSCRIBE").contains(data.getCommand().getName())) { + for (Object param : data.getParams()) { + messageDecoders.remove(param.toString()); + } } data.getPromise().setSuccess(result); } else { @@ -104,12 +123,35 @@ public class RedisDecoder extends ReplayingDecoder { } } - private Decoder decoder(RedisData data, boolean isMulti) { + private MultiDecoder messageDecoder(RedisData data, List parts) { if (data == null) { - return nextDecoder; + if (parts.get(0).equals("message")) { + String channelName = (String) parts.get(1); + return messageDecoders.get(channelName); + } + if (parts.get(0).equals("pmessage")) { + String patternName = (String) parts.get(1); + return messageDecoders.get(patternName); + } } + return data.getCommand().getReplayMultiDecoder(); + } + + private Decoder decoder(RedisData data, List parts, Decoder currentDecoder) { + if (data == null) { + if (parts.size() == 2 && parts.get(0).equals("message")) { + String channelName = (String) parts.get(1); + return messageDecoders.get(channelName); + } + if (parts.size() == 3 && parts.get(0).equals("pmessage")) { + String patternName = (String) parts.get(1); + return messageDecoders.get(patternName); + } + return currentDecoder; + } + Decoder decoder = data.getCommand().getReplayDecoder(); - if (isMulti) { + if (parts != null) { decoder = data.getCommand().getReplayMultiDecoder(); } if (decoder == null) {