PubSub messaging handling

pull/243/head
Nikita 10 years ago
parent e5da696339
commit 7a35bdd31e

@ -21,7 +21,9 @@ import java.util.concurrent.ExecutionException;
import org.redisson.client.handler.RedisCommandsQueue;
import org.redisson.client.handler.RedisDecoder;
import org.redisson.client.handler.RedisEncoder;
import org.redisson.client.protocol.PubSubMessage;
import org.redisson.client.protocol.PubSubStatusMessage;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.StringCodec;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
@ -112,8 +114,26 @@ public class RedisClient {
Future<Long> m = rpsc.publish("sss", "123");
System.out.println("out: " + m.get());
Future<PubSubMessage> m1 = rpsc.subscribe("sss");
Future<PubSubStatusMessage> m1 = rpsc.psubscribe("ss*");
System.out.println("out: " + m1.get());
rpsc.addListener(new RedisPubSubListener<String>() {
@Override
public void onMessage(String channel, String message) {
System.out.println("incoming message: " + message);
}
@Override
public void onPatternMessage(String pattern, String channel, String message) {
System.out.println("incoming pattern pattern: " + pattern
+ " channel: " + channel + " message: " + message);
}
});
final RedisClient c2 = new RedisClient("127.0.0.1", 6379);
Long res = c2.connect().sync(new StringCodec(), RedisCommands.PUBLISH, "sss", "4444");
// System.out.println("published: " + res);

@ -1,30 +1,68 @@
package org.redisson.client;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.redisson.client.handler.RedisData;
import org.redisson.client.protocol.Codec;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.PubSubStatusMessage;
import org.redisson.client.protocol.PubSubStatusDecoder;
import org.redisson.client.protocol.PubSubMessage;
import org.redisson.client.protocol.PubSubMessageDecoder;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.StringCodec;
import org.redisson.client.protocol.PubSubMessageDecoder;
import org.redisson.client.protocol.PubSubPatternMessage;
import org.redisson.client.protocol.PubSubPatternMessageDecoder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
public class RedisPubSubConnection {
public static final AttributeKey<RedisPubSubConnection> CONNECTION = AttributeKey.valueOf("connection");
final ConcurrentLinkedQueue<RedisPubSubListener<Object>> listeners = new ConcurrentLinkedQueue<RedisPubSubListener<Object>>();
final Channel channel;
final RedisClient redisClient;
public RedisPubSubConnection(RedisClient redisClient, Channel channel) {
this.redisClient = redisClient;
this.channel = channel;
channel.attr(CONNECTION).set(this);
}
public Future<PubSubMessage> subscribe(String ... channel) {
return async(new PubSubMessageDecoder(), RedisCommands.SUBSCRIBE, channel);
public void addListener(RedisPubSubListener listener) {
listeners.add(listener);
}
public void onMessage(PubSubMessage message) {
for (RedisPubSubListener<Object> redisPubSubListener : listeners) {
redisPubSubListener.onMessage(message.getChannel(), message.getValue());
}
}
public void onMessage(PubSubPatternMessage message) {
for (RedisPubSubListener<Object> redisPubSubListener : listeners) {
redisPubSubListener.onPatternMessage(message.getPattern(), message.getChannel(), message.getValue());
}
}
public Future<PubSubStatusMessage> subscribe(String ... channel) {
return async(new PubSubStatusDecoder(), new PubSubMessageDecoder(), RedisCommands.SUBSCRIBE, channel);
}
public Future<PubSubStatusMessage> psubscribe(String ... channel) {
return async(new PubSubStatusDecoder(), new PubSubPatternMessageDecoder(), RedisCommands.PSUBSCRIBE, channel);
}
public Future<PubSubStatusMessage> unsubscribe(String ... channel) {
return async(new PubSubStatusDecoder(), RedisCommands.SUBSCRIBE, channel);
}
public Future<Long> publish(String channel, String msg) {
@ -37,6 +75,13 @@ public class RedisPubSubConnection {
return promise;
}
public <T, R> Future<R> async(Codec encoder, Decoder<Object> nextDecoder, RedisCommand<T> command, Object ... params) {
Promise<R> promise = redisClient.getBootstrap().group().next().<R>newPromise();
channel.writeAndFlush(new RedisData<T, R>(promise, nextDecoder, encoder, command, params));
return promise;
}
public ChannelFuture closeAsync() {
return channel.close();
}

@ -0,0 +1,9 @@
package org.redisson.client;
public interface RedisPubSubListener<V> {
void onMessage(String channel, V message);
void onPatternMessage(String pattern, String channel, V message);
}

@ -18,7 +18,7 @@ package org.redisson.client.handler;
import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.client.protocol.Codec;
import org.redisson.client.protocol.Encoder;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.RedisCommand;
import io.netty.util.concurrent.Promise;
@ -30,12 +30,18 @@ public class RedisData<T, R> {
final Object[] params;
final Codec codec;
final AtomicBoolean sended = new AtomicBoolean();
final Decoder<Object> nextDecoder;
public RedisData(Promise<R> promise, Codec encoder, RedisCommand<T> command, Object[] params) {
public RedisData(Promise<R> promise, Codec codec, RedisCommand<T> command, Object[] params) {
this(promise, null, codec, command, params);
}
public RedisData(Promise<R> promise, Decoder<Object> nextDecoder, Codec codec, RedisCommand<T> command, Object[] params) {
this.promise = promise;
this.command = command;
this.params = params;
this.codec = encoder;
this.codec = codec;
this.nextDecoder = nextDecoder;
}
public RedisCommand<T> getCommand() {
@ -46,6 +52,10 @@ public class RedisData<T, R> {
return params;
}
public Decoder<Object> getNextDecoder() {
return nextDecoder;
}
public Promise<R> getPromise() {
return promise;
}

@ -17,11 +17,15 @@ package org.redisson.client.handler;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.redisson.client.RedisException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.handler.RedisCommandsQueue.QueueCommands;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.PubSubMessage;
import org.redisson.client.protocol.PubSubPatternMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
@ -34,17 +38,21 @@ public class RedisDecoder extends ReplayingDecoder<Void> {
public static final char LF = '\n';
private static final char ZERO = '0';
private Decoder<Object> nextDecoder;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
RedisData<Object, Object> data = ctx.channel().attr(RedisCommandsQueue.REPLAY_PROMISE).getAndRemove();
RedisPubSubConnection pubSubConnection = ctx.channel().attr(RedisPubSubConnection.CONNECTION).get();
decode(in, data, null);
decode(in, data, null, pubSubConnection);
ctx.pipeline().fireUserEventTriggered(QueueCommands.NEXT_COMMAND);
}
private void decode(ByteBuf in, RedisData<Object, Object> data, List<Object> parts) throws IOException {
private void decode(ByteBuf in, RedisData<Object, Object> data, List<Object> parts, RedisPubSubConnection pubSubConnection) throws IOException {
int code = in.readByte();
// System.out.println("trying decode -- " + (char)code);
if (code == '+') {
Object result = data.getCommand().getReponseDecoder().decode(in);
if (parts != null) {
@ -58,18 +66,16 @@ public class RedisDecoder extends ReplayingDecoder<Void> {
} else if (code == ':') {
String status = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8);
in.skipBytes(2);
Long result = Long.valueOf(status);
Object result = Long.valueOf(status);
result = data.getCommand().getConvertor().convert(result);
if (parts != null) {
parts.add(result);
} else {
data.getPromise().setSuccess(result);
}
} else if (code == '$') {
Decoder<Object> decoder = data.getCommand().getReponseDecoder();
if (decoder == null) {
decoder = data.getCodec();
}
Object result = decoder.decode(readBytes(in));
Object result = decoder(data).decode(readBytes(in));
if (parts != null) {
parts.add(result);
} else {
@ -79,20 +85,38 @@ public class RedisDecoder extends ReplayingDecoder<Void> {
long size = readLong(in);
List<Object> respParts = new ArrayList<Object>();
for (int i = 0; i < size; i++) {
decode(in, data, respParts);
decode(in, data, respParts, pubSubConnection);
}
Decoder<Object> decoder = data.getCommand().getReponseDecoder();
if (decoder == null) {
decoder = data.getCodec();
Object result = decoder(data).decode(respParts);
if (data != null) {
if (Arrays.asList("PSUBSCRIBE", "SUBSCRIBE").contains(data.getCommand().getName())) {
nextDecoder = data.getNextDecoder();
}
Object result = decoder.decode(respParts);
data.getPromise().setSuccess(result);
} else {
if (result instanceof PubSubMessage) {
pubSubConnection.onMessage((PubSubMessage) result);
} else {
pubSubConnection.onMessage((PubSubPatternMessage) result);
}
}
} else {
throw new IllegalStateException("Can't decode replay " + (char)code);
}
}
private Decoder<Object> decoder(RedisData<Object, Object> data) {
if (data == null) {
return nextDecoder;
}
Decoder<Object> decoder = data.getCommand().getReponseDecoder();
if (decoder == null) {
decoder = data.getCodec();
}
return decoder;
}
public ByteBuf readBytes(ByteBuf is) throws IOException {
long l = readLong(is);
if (l > Integer.MAX_VALUE) {

@ -50,8 +50,8 @@ public class RedisEncoder extends MessageToByteEncoder<RedisData<Object, Object>
i++;
}
String o = out.toString(CharsetUtil.UTF_8);
System.out.println(o);
// String o = out.toString(CharsetUtil.UTF_8);
// System.out.println(o);
}
private void writeArgument(ByteBuf out, byte[] arg) {

@ -15,23 +15,12 @@
*/
package org.redisson.client.protocol;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class BooleanReplayDecoder implements Decoder<Boolean> {
public class BooleanReplayConvertor implements Convertor<Boolean> {
@Override
public Boolean decode(ByteBuf buf) {
String status = buf.readBytes(buf.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8);
buf.skipBytes(2);
return Boolean.valueOf(status);
public Boolean convert(Object obj) {
return "1".equals(obj);
}
@Override
public Boolean decode(List<Object> parts) {
throw new IllegalStateException();
}
}

@ -0,0 +1,7 @@
package org.redisson.client.protocol;
public interface Convertor<R> {
R convert(Object obj);
}

@ -0,0 +1,10 @@
package org.redisson.client.protocol;
public class EmptyConvertor<R> implements Convertor<R> {
@Override
public R convert(Object obj) {
return (R) obj;
}
}

@ -2,28 +2,26 @@ package org.redisson.client.protocol;
public class PubSubMessage {
public enum Type {SUBSCRIBE, MESSAGE}
private final String channel;
private final Object value;
private Type type;
private String channel;
public PubSubMessage(Type type, String channel) {
public PubSubMessage(String channel, Object value) {
super();
this.type = type;
this.channel = channel;
this.value = value;
}
public String getChannel() {
return channel;
}
public Type getType() {
return type;
public Object getValue() {
return value;
}
@Override
public String toString() {
return "PubSubReplay [type=" + type + ", channel=" + channel + "]";
return "PubSubMessage [channel=" + channel + ", value=" + value + "]";
}
}

@ -1,47 +1,22 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol;
import java.io.UnsupportedEncodingException;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class PubSubMessageDecoder implements Codec {
public class PubSubMessageDecoder implements Decoder<Object> {
@Override
public String decode(ByteBuf buf) {
public Object decode(ByteBuf buf) {
String status = buf.toString(CharsetUtil.UTF_8);
buf.skipBytes(2);
return status;
}
@Override
public PubSubMessage decode(List<Object> parts) {
return new PubSubMessage(PubSubMessage.Type.valueOf(parts.get(0).toString().toUpperCase()), parts.get(1).toString());
}
@Override
public byte[] encode(int paramIndex, Object in) {
try {
return in.toString().getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new IllegalStateException(e);
}
public Object decode(List<Object> parts) {
return new PubSubMessage(parts.get(1).toString(), parts.get(2).toString());
}
}

@ -0,0 +1,33 @@
package org.redisson.client.protocol;
public class PubSubPatternMessage {
private final String pattern;
private final String channel;
private final Object value;
public PubSubPatternMessage(String pattern, String channel, Object value) {
super();
this.pattern = pattern;
this.channel = channel;
this.value = value;
}
public String getPattern() {
return pattern;
}
public String getChannel() {
return channel;
}
public Object getValue() {
return value;
}
@Override
public String toString() {
return "PubSubPatternMessage [pattern=" + pattern + ", channel=" + channel + ", value=" + value + "]";
}
}

@ -0,0 +1,22 @@
package org.redisson.client.protocol;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class PubSubPatternMessageDecoder implements Decoder<Object> {
@Override
public Object decode(ByteBuf buf) {
String status = buf.toString(CharsetUtil.UTF_8);
buf.skipBytes(2);
return status;
}
@Override
public Object decode(List<Object> parts) {
return new PubSubPatternMessage(parts.get(1).toString(), parts.get(2).toString(), parts.get(3));
}
}

@ -0,0 +1,52 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class PubSubStatusDecoder implements Codec {
@Override
public String decode(ByteBuf buf) {
String status = buf.toString(CharsetUtil.UTF_8);
buf.skipBytes(2);
return status;
}
@Override
public PubSubStatusMessage decode(List<Object> parts) {
List<String> channels = new ArrayList<String>();
for (Object part : parts.subList(1, parts.size()-1)) {
channels.add(part.toString());
}
return new PubSubStatusMessage(PubSubStatusMessage.Type.valueOf(parts.get(0).toString().toUpperCase()), channels);
}
@Override
public byte[] encode(int paramIndex, Object in) {
try {
return in.toString().getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new IllegalStateException(e);
}
}
}

@ -0,0 +1,31 @@
package org.redisson.client.protocol;
import java.util.List;
public class PubSubStatusMessage {
public enum Type {SUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE, UNSUBSCRIBE}
private final Type type;
private final List<String> channels;
public PubSubStatusMessage(Type type, List<String> channels) {
super();
this.type = type;
this.channels = channels;
}
public List<String> getChannels() {
return channels;
}
public Type getType() {
return type;
}
@Override
public String toString() {
return "PubSubStatusMessage [type=" + type + ", channels=" + channels + "]";
}
}

@ -21,6 +21,7 @@ public class RedisCommand<R> {
private final String subName;
private final int[] encodeParamIndexes;
private Decoder<R> reponseDecoder;
private Convertor<R> convertor = new EmptyConvertor<R>();
public RedisCommand(String name, String subName, int ... encodeParamIndexes) {
this(name, subName, null, encodeParamIndexes);
@ -30,6 +31,13 @@ public class RedisCommand<R> {
this(name, null, null, encodeParamIndexes);
}
public RedisCommand(String name, Convertor<R> convertor, int ... encodeParamIndexes) {
this.name = name;
this.subName = null;
this.encodeParamIndexes = encodeParamIndexes;
this.convertor = convertor;
}
public RedisCommand(String name, Decoder<R> reponseDecoder, int ... encodeParamIndexes) {
this(name, null, reponseDecoder, encodeParamIndexes);
}
@ -58,4 +66,8 @@ public class RedisCommand<R> {
return encodeParamIndexes;
}
public Convertor<R> getConvertor() {
return convertor;
}
}

@ -25,9 +25,12 @@ public interface RedisCommands {
RedisCommand<Object> GET = new RedisCommand<Object>("GET");
RedisCommand<String> SET = new RedisCommand<String>("SET", new StringReplayDecoder(), 1);
RedisCommand<String> SETEX = new RedisCommand<String>("SETEX", new StringReplayDecoder(), 2);
RedisCommand<Boolean> EXISTS = new RedisCommand<Boolean>("EXISTS", new BooleanReplayDecoder(), 1);
RedisCommand<Boolean> EXISTS = new RedisCommand<Boolean>("EXISTS", new BooleanReplayConvertor(), 1);
RedisCommand<Long> PUBLISH = new RedisCommand<Long>("PUBLISH", 1);
RedisCommand<PubSubMessageDecoder> SUBSCRIBE = new RedisCommand<PubSubMessageDecoder>("SUBSCRIBE", 1);
RedisCommand<PubSubStatusMessage> SUBSCRIBE = new RedisCommand<PubSubStatusMessage>("SUBSCRIBE", 1);
RedisCommand<PubSubStatusMessage> UNSUBSCRIBE = new RedisCommand<PubSubStatusMessage>("UNSUBSCRIBE", 1);
RedisCommand<PubSubStatusMessage> PSUBSCRIBE = new RedisCommand<PubSubStatusMessage>("PSUBSCRIBE", 1);
}

@ -15,6 +15,14 @@ import org.redisson.core.*;
public class RedissonBlockingQueueTest extends BaseTest {
@Test
public void testAwait() throws InterruptedException {
RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("queue1");
queue1.put(1);
Assert.assertEquals((Integer)1, queue1.poll(10, TimeUnit.SECONDS));
}
@Test
public void testPollLastAndOfferFirstTo() throws InterruptedException {
RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("queue1");

Loading…
Cancel
Save