From 7a35bdd31e49281bf4bd2a22d34fca0b1f7da4d0 Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 9 Jul 2015 16:15:05 +0300 Subject: [PATCH] PubSub messaging handling --- .../java/org/redisson/client/RedisClient.java | 24 ++++++++- .../client/RedisPubSubConnection.java | 51 ++++++++++++++++-- .../redisson/client/RedisPubSubListener.java | 9 ++++ .../redisson/client/handler/RedisData.java | 16 ++++-- .../redisson/client/handler/RedisDecoder.java | 52 ++++++++++++++----- .../redisson/client/handler/RedisEncoder.java | 4 +- ...coder.java => BooleanReplayConvertor.java} | 17 ++---- .../redisson/client/protocol/Convertor.java | 7 +++ .../client/protocol/EmptyConvertor.java | 10 ++++ .../client/protocol/PubSubMessage.java | 16 +++--- .../client/protocol/PubSubMessageDecoder.java | 33 ++---------- .../client/protocol/PubSubPatternMessage.java | 33 ++++++++++++ .../protocol/PubSubPatternMessageDecoder.java | 22 ++++++++ .../client/protocol/PubSubStatusDecoder.java | 52 +++++++++++++++++++ .../client/protocol/PubSubStatusMessage.java | 31 +++++++++++ .../client/protocol/RedisCommand.java | 12 +++++ .../client/protocol/RedisCommands.java | 7 ++- .../redisson/RedissonBlockingQueueTest.java | 12 ++++- 18 files changed, 328 insertions(+), 80 deletions(-) create mode 100644 src/main/java/org/redisson/client/RedisPubSubListener.java rename src/main/java/org/redisson/client/protocol/{BooleanReplayDecoder.java => BooleanReplayConvertor.java} (59%) create mode 100644 src/main/java/org/redisson/client/protocol/Convertor.java create mode 100644 src/main/java/org/redisson/client/protocol/EmptyConvertor.java create mode 100644 src/main/java/org/redisson/client/protocol/PubSubPatternMessage.java create mode 100644 src/main/java/org/redisson/client/protocol/PubSubPatternMessageDecoder.java create mode 100644 src/main/java/org/redisson/client/protocol/PubSubStatusDecoder.java create mode 100644 src/main/java/org/redisson/client/protocol/PubSubStatusMessage.java diff --git a/src/main/java/org/redisson/client/RedisClient.java b/src/main/java/org/redisson/client/RedisClient.java index 658adf055..0f227798c 100644 --- a/src/main/java/org/redisson/client/RedisClient.java +++ b/src/main/java/org/redisson/client/RedisClient.java @@ -21,7 +21,9 @@ import java.util.concurrent.ExecutionException; import org.redisson.client.handler.RedisCommandsQueue; import org.redisson.client.handler.RedisDecoder; import org.redisson.client.handler.RedisEncoder; -import org.redisson.client.protocol.PubSubMessage; +import org.redisson.client.protocol.PubSubStatusMessage; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.client.protocol.StringCodec; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; @@ -112,8 +114,26 @@ public class RedisClient { Future m = rpsc.publish("sss", "123"); System.out.println("out: " + m.get()); - Future m1 = rpsc.subscribe("sss"); + Future m1 = rpsc.psubscribe("ss*"); System.out.println("out: " + m1.get()); + rpsc.addListener(new RedisPubSubListener() { + @Override + public void onMessage(String channel, String message) { + System.out.println("incoming message: " + message); + } + + @Override + public void onPatternMessage(String pattern, String channel, String message) { + System.out.println("incoming pattern pattern: " + pattern + + " channel: " + channel + " message: " + message); + + } + }); + + + final RedisClient c2 = new RedisClient("127.0.0.1", 6379); + Long res = c2.connect().sync(new StringCodec(), RedisCommands.PUBLISH, "sss", "4444"); +// System.out.println("published: " + res); diff --git a/src/main/java/org/redisson/client/RedisPubSubConnection.java b/src/main/java/org/redisson/client/RedisPubSubConnection.java index 81ed41a86..e89bf52fa 100644 --- a/src/main/java/org/redisson/client/RedisPubSubConnection.java +++ b/src/main/java/org/redisson/client/RedisPubSubConnection.java @@ -1,30 +1,68 @@ package org.redisson.client; +import java.util.concurrent.ConcurrentLinkedQueue; + import org.redisson.client.handler.RedisData; import org.redisson.client.protocol.Codec; +import org.redisson.client.protocol.Decoder; +import org.redisson.client.protocol.PubSubStatusMessage; +import org.redisson.client.protocol.PubSubStatusDecoder; import org.redisson.client.protocol.PubSubMessage; -import org.redisson.client.protocol.PubSubMessageDecoder; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.StringCodec; +import org.redisson.client.protocol.PubSubMessageDecoder; +import org.redisson.client.protocol.PubSubPatternMessage; +import org.redisson.client.protocol.PubSubPatternMessageDecoder; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; +import io.netty.util.AttributeKey; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; public class RedisPubSubConnection { + public static final AttributeKey CONNECTION = AttributeKey.valueOf("connection"); + + final ConcurrentLinkedQueue> listeners = new ConcurrentLinkedQueue>(); + final Channel channel; final RedisClient redisClient; public RedisPubSubConnection(RedisClient redisClient, Channel channel) { this.redisClient = redisClient; this.channel = channel; + + channel.attr(CONNECTION).set(this); } - public Future subscribe(String ... channel) { - return async(new PubSubMessageDecoder(), RedisCommands.SUBSCRIBE, channel); + public void addListener(RedisPubSubListener listener) { + listeners.add(listener); + } + + public void onMessage(PubSubMessage message) { + for (RedisPubSubListener redisPubSubListener : listeners) { + redisPubSubListener.onMessage(message.getChannel(), message.getValue()); + } + } + + public void onMessage(PubSubPatternMessage message) { + for (RedisPubSubListener redisPubSubListener : listeners) { + redisPubSubListener.onPatternMessage(message.getPattern(), message.getChannel(), message.getValue()); + } + } + + public Future subscribe(String ... channel) { + return async(new PubSubStatusDecoder(), new PubSubMessageDecoder(), RedisCommands.SUBSCRIBE, channel); + } + + public Future psubscribe(String ... channel) { + return async(new PubSubStatusDecoder(), new PubSubPatternMessageDecoder(), RedisCommands.PSUBSCRIBE, channel); + } + + public Future unsubscribe(String ... channel) { + return async(new PubSubStatusDecoder(), RedisCommands.SUBSCRIBE, channel); } public Future publish(String channel, String msg) { @@ -37,6 +75,13 @@ public class RedisPubSubConnection { return promise; } + public Future async(Codec encoder, Decoder nextDecoder, RedisCommand command, Object ... params) { + Promise promise = redisClient.getBootstrap().group().next().newPromise(); + channel.writeAndFlush(new RedisData(promise, nextDecoder, encoder, command, params)); + return promise; + } + + public ChannelFuture closeAsync() { return channel.close(); } diff --git a/src/main/java/org/redisson/client/RedisPubSubListener.java b/src/main/java/org/redisson/client/RedisPubSubListener.java new file mode 100644 index 000000000..b4217ba92 --- /dev/null +++ b/src/main/java/org/redisson/client/RedisPubSubListener.java @@ -0,0 +1,9 @@ +package org.redisson.client; + +public interface RedisPubSubListener { + + void onMessage(String channel, V message); + + void onPatternMessage(String pattern, String channel, V message); + +} diff --git a/src/main/java/org/redisson/client/handler/RedisData.java b/src/main/java/org/redisson/client/handler/RedisData.java index 14e6acffa..c8fcf41eb 100644 --- a/src/main/java/org/redisson/client/handler/RedisData.java +++ b/src/main/java/org/redisson/client/handler/RedisData.java @@ -18,7 +18,7 @@ package org.redisson.client.handler; import java.util.concurrent.atomic.AtomicBoolean; import org.redisson.client.protocol.Codec; -import org.redisson.client.protocol.Encoder; +import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.RedisCommand; import io.netty.util.concurrent.Promise; @@ -30,12 +30,18 @@ public class RedisData { final Object[] params; final Codec codec; final AtomicBoolean sended = new AtomicBoolean(); + final Decoder nextDecoder; - public RedisData(Promise promise, Codec encoder, RedisCommand command, Object[] params) { + public RedisData(Promise promise, Codec codec, RedisCommand command, Object[] params) { + this(promise, null, codec, command, params); + } + + public RedisData(Promise promise, Decoder nextDecoder, Codec codec, RedisCommand command, Object[] params) { this.promise = promise; this.command = command; this.params = params; - this.codec = encoder; + this.codec = codec; + this.nextDecoder = nextDecoder; } public RedisCommand getCommand() { @@ -46,6 +52,10 @@ public class RedisData { return params; } + public Decoder getNextDecoder() { + return nextDecoder; + } + public Promise getPromise() { return promise; } diff --git a/src/main/java/org/redisson/client/handler/RedisDecoder.java b/src/main/java/org/redisson/client/handler/RedisDecoder.java index 5785b3d09..b0507fd0e 100644 --- a/src/main/java/org/redisson/client/handler/RedisDecoder.java +++ b/src/main/java/org/redisson/client/handler/RedisDecoder.java @@ -17,11 +17,15 @@ package org.redisson.client.handler; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.redisson.client.RedisException; +import org.redisson.client.RedisPubSubConnection; import org.redisson.client.handler.RedisCommandsQueue.QueueCommands; import org.redisson.client.protocol.Decoder; +import org.redisson.client.protocol.PubSubMessage; +import org.redisson.client.protocol.PubSubPatternMessage; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; @@ -34,17 +38,21 @@ public class RedisDecoder extends ReplayingDecoder { public static final char LF = '\n'; private static final char ZERO = '0'; + private Decoder nextDecoder; + @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { RedisData data = ctx.channel().attr(RedisCommandsQueue.REPLAY_PROMISE).getAndRemove(); + RedisPubSubConnection pubSubConnection = ctx.channel().attr(RedisPubSubConnection.CONNECTION).get(); - decode(in, data, null); + decode(in, data, null, pubSubConnection); ctx.pipeline().fireUserEventTriggered(QueueCommands.NEXT_COMMAND); } - private void decode(ByteBuf in, RedisData data, List parts) throws IOException { + private void decode(ByteBuf in, RedisData data, List parts, RedisPubSubConnection pubSubConnection) throws IOException { int code = in.readByte(); +// System.out.println("trying decode -- " + (char)code); if (code == '+') { Object result = data.getCommand().getReponseDecoder().decode(in); if (parts != null) { @@ -58,18 +66,16 @@ public class RedisDecoder extends ReplayingDecoder { } else if (code == ':') { String status = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8); in.skipBytes(2); - Long result = Long.valueOf(status); + Object result = Long.valueOf(status); + result = data.getCommand().getConvertor().convert(result); + if (parts != null) { parts.add(result); } else { data.getPromise().setSuccess(result); } } else if (code == '$') { - Decoder decoder = data.getCommand().getReponseDecoder(); - if (decoder == null) { - decoder = data.getCodec(); - } - Object result = decoder.decode(readBytes(in)); + Object result = decoder(data).decode(readBytes(in)); if (parts != null) { parts.add(result); } else { @@ -79,20 +85,38 @@ public class RedisDecoder extends ReplayingDecoder { long size = readLong(in); List respParts = new ArrayList(); for (int i = 0; i < size; i++) { - decode(in, data, respParts); + decode(in, data, respParts, pubSubConnection); } - Decoder decoder = data.getCommand().getReponseDecoder(); - if (decoder == null) { - decoder = data.getCodec(); + Object result = decoder(data).decode(respParts); + if (data != null) { + if (Arrays.asList("PSUBSCRIBE", "SUBSCRIBE").contains(data.getCommand().getName())) { + nextDecoder = data.getNextDecoder(); + } + data.getPromise().setSuccess(result); + } else { + if (result instanceof PubSubMessage) { + pubSubConnection.onMessage((PubSubMessage) result); + } else { + pubSubConnection.onMessage((PubSubPatternMessage) result); + } } - Object result = decoder.decode(respParts); - data.getPromise().setSuccess(result); } else { throw new IllegalStateException("Can't decode replay " + (char)code); } } + private Decoder decoder(RedisData data) { + if (data == null) { + return nextDecoder; + } + Decoder decoder = data.getCommand().getReponseDecoder(); + if (decoder == null) { + decoder = data.getCodec(); + } + return decoder; + } + public ByteBuf readBytes(ByteBuf is) throws IOException { long l = readLong(is); if (l > Integer.MAX_VALUE) { diff --git a/src/main/java/org/redisson/client/handler/RedisEncoder.java b/src/main/java/org/redisson/client/handler/RedisEncoder.java index bb4c6e155..5f712bc79 100644 --- a/src/main/java/org/redisson/client/handler/RedisEncoder.java +++ b/src/main/java/org/redisson/client/handler/RedisEncoder.java @@ -50,8 +50,8 @@ public class RedisEncoder extends MessageToByteEncoder i++; } - String o = out.toString(CharsetUtil.UTF_8); - System.out.println(o); +// String o = out.toString(CharsetUtil.UTF_8); +// System.out.println(o); } private void writeArgument(ByteBuf out, byte[] arg) { diff --git a/src/main/java/org/redisson/client/protocol/BooleanReplayDecoder.java b/src/main/java/org/redisson/client/protocol/BooleanReplayConvertor.java similarity index 59% rename from src/main/java/org/redisson/client/protocol/BooleanReplayDecoder.java rename to src/main/java/org/redisson/client/protocol/BooleanReplayConvertor.java index 4cf511022..62ddbd1c5 100644 --- a/src/main/java/org/redisson/client/protocol/BooleanReplayDecoder.java +++ b/src/main/java/org/redisson/client/protocol/BooleanReplayConvertor.java @@ -15,23 +15,12 @@ */ package org.redisson.client.protocol; -import java.util.List; - -import io.netty.buffer.ByteBuf; -import io.netty.util.CharsetUtil; - -public class BooleanReplayDecoder implements Decoder { +public class BooleanReplayConvertor implements Convertor { @Override - public Boolean decode(ByteBuf buf) { - String status = buf.readBytes(buf.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8); - buf.skipBytes(2); - return Boolean.valueOf(status); + public Boolean convert(Object obj) { + return "1".equals(obj); } - @Override - public Boolean decode(List parts) { - throw new IllegalStateException(); - } } diff --git a/src/main/java/org/redisson/client/protocol/Convertor.java b/src/main/java/org/redisson/client/protocol/Convertor.java new file mode 100644 index 000000000..f1efc9fe2 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/Convertor.java @@ -0,0 +1,7 @@ +package org.redisson.client.protocol; + +public interface Convertor { + + R convert(Object obj); + +} diff --git a/src/main/java/org/redisson/client/protocol/EmptyConvertor.java b/src/main/java/org/redisson/client/protocol/EmptyConvertor.java new file mode 100644 index 000000000..fef945872 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/EmptyConvertor.java @@ -0,0 +1,10 @@ +package org.redisson.client.protocol; + +public class EmptyConvertor implements Convertor { + + @Override + public R convert(Object obj) { + return (R) obj; + } + +} diff --git a/src/main/java/org/redisson/client/protocol/PubSubMessage.java b/src/main/java/org/redisson/client/protocol/PubSubMessage.java index e8aaefbf3..7c49e7d52 100644 --- a/src/main/java/org/redisson/client/protocol/PubSubMessage.java +++ b/src/main/java/org/redisson/client/protocol/PubSubMessage.java @@ -2,28 +2,26 @@ package org.redisson.client.protocol; public class PubSubMessage { - public enum Type {SUBSCRIBE, MESSAGE} + private final String channel; + private final Object value; - private Type type; - private String channel; - - public PubSubMessage(Type type, String channel) { + public PubSubMessage(String channel, Object value) { super(); - this.type = type; this.channel = channel; + this.value = value; } public String getChannel() { return channel; } - public Type getType() { - return type; + public Object getValue() { + return value; } @Override public String toString() { - return "PubSubReplay [type=" + type + ", channel=" + channel + "]"; + return "PubSubMessage [channel=" + channel + ", value=" + value + "]"; } } diff --git a/src/main/java/org/redisson/client/protocol/PubSubMessageDecoder.java b/src/main/java/org/redisson/client/protocol/PubSubMessageDecoder.java index 889f876f9..a4ddd5728 100644 --- a/src/main/java/org/redisson/client/protocol/PubSubMessageDecoder.java +++ b/src/main/java/org/redisson/client/protocol/PubSubMessageDecoder.java @@ -1,47 +1,22 @@ -/** - * Copyright 2014 Nikita Koksharov, Nickolay Borbit - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package org.redisson.client.protocol; -import java.io.UnsupportedEncodingException; import java.util.List; import io.netty.buffer.ByteBuf; import io.netty.util.CharsetUtil; -public class PubSubMessageDecoder implements Codec { +public class PubSubMessageDecoder implements Decoder { @Override - public String decode(ByteBuf buf) { + public Object decode(ByteBuf buf) { String status = buf.toString(CharsetUtil.UTF_8); buf.skipBytes(2); return status; } @Override - public PubSubMessage decode(List parts) { - return new PubSubMessage(PubSubMessage.Type.valueOf(parts.get(0).toString().toUpperCase()), parts.get(1).toString()); - } - - @Override - public byte[] encode(int paramIndex, Object in) { - try { - return in.toString().getBytes("UTF-8"); - } catch (UnsupportedEncodingException e) { - throw new IllegalStateException(e); - } + public Object decode(List parts) { + return new PubSubMessage(parts.get(1).toString(), parts.get(2).toString()); } } diff --git a/src/main/java/org/redisson/client/protocol/PubSubPatternMessage.java b/src/main/java/org/redisson/client/protocol/PubSubPatternMessage.java new file mode 100644 index 000000000..0d94c6f42 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/PubSubPatternMessage.java @@ -0,0 +1,33 @@ +package org.redisson.client.protocol; + +public class PubSubPatternMessage { + + private final String pattern; + private final String channel; + private final Object value; + + public PubSubPatternMessage(String pattern, String channel, Object value) { + super(); + this.pattern = pattern; + this.channel = channel; + this.value = value; + } + + public String getPattern() { + return pattern; + } + + public String getChannel() { + return channel; + } + + public Object getValue() { + return value; + } + + @Override + public String toString() { + return "PubSubPatternMessage [pattern=" + pattern + ", channel=" + channel + ", value=" + value + "]"; + } + +} diff --git a/src/main/java/org/redisson/client/protocol/PubSubPatternMessageDecoder.java b/src/main/java/org/redisson/client/protocol/PubSubPatternMessageDecoder.java new file mode 100644 index 000000000..0e7064791 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/PubSubPatternMessageDecoder.java @@ -0,0 +1,22 @@ +package org.redisson.client.protocol; + +import java.util.List; + +import io.netty.buffer.ByteBuf; +import io.netty.util.CharsetUtil; + +public class PubSubPatternMessageDecoder implements Decoder { + + @Override + public Object decode(ByteBuf buf) { + String status = buf.toString(CharsetUtil.UTF_8); + buf.skipBytes(2); + return status; + } + + @Override + public Object decode(List parts) { + return new PubSubPatternMessage(parts.get(1).toString(), parts.get(2).toString(), parts.get(3)); + } + +} diff --git a/src/main/java/org/redisson/client/protocol/PubSubStatusDecoder.java b/src/main/java/org/redisson/client/protocol/PubSubStatusDecoder.java new file mode 100644 index 000000000..3137ec7c2 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/PubSubStatusDecoder.java @@ -0,0 +1,52 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol; + +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.List; + +import io.netty.buffer.ByteBuf; +import io.netty.util.CharsetUtil; + +public class PubSubStatusDecoder implements Codec { + + @Override + public String decode(ByteBuf buf) { + String status = buf.toString(CharsetUtil.UTF_8); + buf.skipBytes(2); + return status; + } + + @Override + public PubSubStatusMessage decode(List parts) { + List channels = new ArrayList(); + for (Object part : parts.subList(1, parts.size()-1)) { + channels.add(part.toString()); + } + return new PubSubStatusMessage(PubSubStatusMessage.Type.valueOf(parts.get(0).toString().toUpperCase()), channels); + } + + @Override + public byte[] encode(int paramIndex, Object in) { + try { + return in.toString().getBytes("UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new IllegalStateException(e); + } + } + +} diff --git a/src/main/java/org/redisson/client/protocol/PubSubStatusMessage.java b/src/main/java/org/redisson/client/protocol/PubSubStatusMessage.java new file mode 100644 index 000000000..39219e359 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/PubSubStatusMessage.java @@ -0,0 +1,31 @@ +package org.redisson.client.protocol; + +import java.util.List; + +public class PubSubStatusMessage { + + public enum Type {SUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE, UNSUBSCRIBE} + + private final Type type; + private final List channels; + + public PubSubStatusMessage(Type type, List channels) { + super(); + this.type = type; + this.channels = channels; + } + + public List getChannels() { + return channels; + } + + public Type getType() { + return type; + } + + @Override + public String toString() { + return "PubSubStatusMessage [type=" + type + ", channels=" + channels + "]"; + } + +} diff --git a/src/main/java/org/redisson/client/protocol/RedisCommand.java b/src/main/java/org/redisson/client/protocol/RedisCommand.java index 4ac5c5a5e..ca8ad9918 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommand.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommand.java @@ -21,6 +21,7 @@ public class RedisCommand { private final String subName; private final int[] encodeParamIndexes; private Decoder reponseDecoder; + private Convertor convertor = new EmptyConvertor(); public RedisCommand(String name, String subName, int ... encodeParamIndexes) { this(name, subName, null, encodeParamIndexes); @@ -30,6 +31,13 @@ public class RedisCommand { this(name, null, null, encodeParamIndexes); } + public RedisCommand(String name, Convertor convertor, int ... encodeParamIndexes) { + this.name = name; + this.subName = null; + this.encodeParamIndexes = encodeParamIndexes; + this.convertor = convertor; + } + public RedisCommand(String name, Decoder reponseDecoder, int ... encodeParamIndexes) { this(name, null, reponseDecoder, encodeParamIndexes); } @@ -58,4 +66,8 @@ public class RedisCommand { return encodeParamIndexes; } + public Convertor getConvertor() { + return convertor; + } + } diff --git a/src/main/java/org/redisson/client/protocol/RedisCommands.java b/src/main/java/org/redisson/client/protocol/RedisCommands.java index 2c85bf2a6..985e3d049 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -25,9 +25,12 @@ public interface RedisCommands { RedisCommand GET = new RedisCommand("GET"); RedisCommand SET = new RedisCommand("SET", new StringReplayDecoder(), 1); RedisCommand SETEX = new RedisCommand("SETEX", new StringReplayDecoder(), 2); - RedisCommand EXISTS = new RedisCommand("EXISTS", new BooleanReplayDecoder(), 1); + RedisCommand EXISTS = new RedisCommand("EXISTS", new BooleanReplayConvertor(), 1); RedisCommand PUBLISH = new RedisCommand("PUBLISH", 1); - RedisCommand SUBSCRIBE = new RedisCommand("SUBSCRIBE", 1); + RedisCommand SUBSCRIBE = new RedisCommand("SUBSCRIBE", 1); + RedisCommand UNSUBSCRIBE = new RedisCommand("UNSUBSCRIBE", 1); + + RedisCommand PSUBSCRIBE = new RedisCommand("PSUBSCRIBE", 1); } diff --git a/src/test/java/org/redisson/RedissonBlockingQueueTest.java b/src/test/java/org/redisson/RedissonBlockingQueueTest.java index 040cd1ff7..5ea05c3f7 100644 --- a/src/test/java/org/redisson/RedissonBlockingQueueTest.java +++ b/src/test/java/org/redisson/RedissonBlockingQueueTest.java @@ -15,6 +15,14 @@ import org.redisson.core.*; public class RedissonBlockingQueueTest extends BaseTest { + @Test + public void testAwait() throws InterruptedException { + RBlockingQueue queue1 = redisson.getBlockingQueue("queue1"); + queue1.put(1); + + Assert.assertEquals((Integer)1, queue1.poll(10, TimeUnit.SECONDS)); + } + @Test public void testPollLastAndOfferFirstTo() throws InterruptedException { RBlockingQueue queue1 = redisson.getBlockingQueue("queue1"); @@ -30,7 +38,7 @@ public class RedissonBlockingQueueTest extends BaseTest { queue1.pollLastAndOfferFirstTo(queue2, 10, TimeUnit.SECONDS); MatcherAssert.assertThat(queue2, Matchers.contains(3, 4, 5, 6)); } - + @Test public void testAddOfferOrigin() { Queue queue = new LinkedList(); @@ -129,7 +137,7 @@ public class RedissonBlockingQueueTest extends BaseTest { final AtomicInteger counter = new AtomicInteger(); int total = 100; for (int i = 0; i < total; i++) { - // runnable won't be executed in any particular order, and hence, int value as well. + // runnable won't be executed in any particular order, and hence, int value as well. executor.submit(new Runnable() { @Override public void run() {