diff --git a/src/main/java/org/redisson/client/RedisPubSubConnection.java b/src/main/java/org/redisson/client/RedisPubSubConnection.java index 8b39732e4..fc1b1d8a0 100644 --- a/src/main/java/org/redisson/client/RedisPubSubConnection.java +++ b/src/main/java/org/redisson/client/RedisPubSubConnection.java @@ -69,8 +69,8 @@ public class RedisPubSubConnection extends RedisConnection { return async(new PubSubMessageDecoder(codec.getValueDecoder()), RedisCommands.SUBSCRIBE, channel); } - public Future psubscribe(String ... channel) { - return async(new PubSubPatternMessageDecoder(), RedisCommands.PSUBSCRIBE, channel); + public Future psubscribe(Codec codec, String ... channel) { + return async(new PubSubPatternMessageDecoder(codec.getValueDecoder()), RedisCommands.PSUBSCRIBE, channel); } public Future unsubscribe(String ... channel) { diff --git a/src/main/java/org/redisson/client/protocol/pubsub/PubSubPatternMessageDecoder.java b/src/main/java/org/redisson/client/protocol/pubsub/PubSubPatternMessageDecoder.java index 02fb8aa50..a6bf4bf4b 100644 --- a/src/main/java/org/redisson/client/protocol/pubsub/PubSubPatternMessageDecoder.java +++ b/src/main/java/org/redisson/client/protocol/pubsub/PubSubPatternMessageDecoder.java @@ -15,20 +15,26 @@ */ package org.redisson.client.protocol.pubsub; +import java.io.IOException; import java.util.List; +import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.decoder.MultiDecoder; import io.netty.buffer.ByteBuf; -import io.netty.util.CharsetUtil; public class PubSubPatternMessageDecoder implements MultiDecoder { + private final Decoder decoder; + + public PubSubPatternMessageDecoder(Decoder decoder) { + super(); + this.decoder = decoder; + } + @Override - public Object decode(ByteBuf buf) { - String status = buf.toString(CharsetUtil.UTF_8); - buf.skipBytes(2); - return status; + public Object decode(ByteBuf buf) throws IOException { + return decoder.decode(buf); } @Override