From 93eb8fad274249ecbc06b9dcae9ca4142de00d5f Mon Sep 17 00:00:00 2001 From: Nikita Date: Sun, 5 Jul 2015 14:57:25 +0300 Subject: [PATCH] Encoder/Decoder added --- .../java/org/redisson/client/RedisClient.java | 43 ++++--- .../org/redisson/client/RedisException.java | 15 +++ .../client/handler/RedisCommandsQueue.java | 14 +-- .../redisson/client/handler/RedisData.java | 26 +++-- .../redisson/client/handler/RedisDecoder.java | 105 ++++++++++++------ .../redisson/client/handler/RedisEncoder.java | 8 +- .../org/redisson/client/protocol/Codec.java | 5 + .../{ObjectDecoder.java => Decoder.java} | 6 +- .../org/redisson/client/protocol/Encoder.java | 7 ++ .../client/protocol/RedisCommand.java | 12 +- .../client/protocol/RedisCommands.java | 4 +- .../redisson/client/protocol/StringCodec.java | 18 +++ .../client/protocol/StringDecoder.java | 20 ---- ...eDecoder.java => StringReplayDecoder.java} | 12 +- 14 files changed, 197 insertions(+), 98 deletions(-) create mode 100644 src/main/java/org/redisson/client/RedisException.java create mode 100644 src/main/java/org/redisson/client/protocol/Codec.java rename src/main/java/org/redisson/client/protocol/{ObjectDecoder.java => Decoder.java} (87%) create mode 100644 src/main/java/org/redisson/client/protocol/Encoder.java create mode 100644 src/main/java/org/redisson/client/protocol/StringCodec.java delete mode 100644 src/main/java/org/redisson/client/protocol/StringDecoder.java rename src/main/java/org/redisson/client/protocol/{ResponseDecoder.java => StringReplayDecoder.java} (66%) diff --git a/src/main/java/org/redisson/client/RedisClient.java b/src/main/java/org/redisson/client/RedisClient.java index 85326892a..5f226e77b 100644 --- a/src/main/java/org/redisson/client/RedisClient.java +++ b/src/main/java/org/redisson/client/RedisClient.java @@ -21,8 +21,10 @@ import org.redisson.client.handler.RedisCommandsQueue; import org.redisson.client.handler.RedisData; import org.redisson.client.handler.RedisDecoder; import org.redisson.client.handler.RedisEncoder; +import org.redisson.client.protocol.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; +import org.redisson.client.protocol.StringCodec; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; @@ -66,27 +68,38 @@ public class RedisClient { return future; } - public Future execute(RedisCommand command, Object ... params) { +// public Future execute(Codec encoder, RedisCommand command, Object ... params) { +// Promise promise = bootstrap.group().next().newPromise(); +// channel.writeAndFlush(new RedisData(promise, encoder, command, params)); +// return promise; +// } + + public Future execute(Codec encoder, RedisCommand command, Object ... params) { Promise promise = bootstrap.group().next().newPromise(); - channel.writeAndFlush(new RedisData(promise, command, params)); + channel.writeAndFlush(new RedisData(promise, encoder, command, params)); return promise; } public static void main(String[] args) throws InterruptedException { RedisClient rc = new RedisClient("127.0.0.1", 6379); rc.connect().sync(); - for (int i = 0; i < 10000; i++) { - final int j = i; - Future res = rc.execute(RedisCommands.SET, "test", "" + Math.random()); - res.addListener(new FutureListener() { - - @Override - public void operationComplete(Future future) throws Exception { - System.out.println("res: " + future.getNow() + " " + j); - } - - }); - } -// rc.execute(RedisCommands.GET, "test"); + Future res = rc.execute(new StringCodec(), RedisCommands.SET, "test", "" + Math.random()); + res.addListener(new FutureListener() { + + @Override + public void operationComplete(Future future) throws Exception { + System.out.println("res 1: " + future.getNow()); + } + + }); + + Future r = rc.execute(new StringCodec(), RedisCommands.GET, "test"); + r.addListener(new FutureListener() { + + @Override + public void operationComplete(Future future) throws Exception { + System.out.println("res 2: " + future.getNow()); + } + }); } } diff --git a/src/main/java/org/redisson/client/RedisException.java b/src/main/java/org/redisson/client/RedisException.java new file mode 100644 index 000000000..f5bb25971 --- /dev/null +++ b/src/main/java/org/redisson/client/RedisException.java @@ -0,0 +1,15 @@ +package org.redisson.client; + +public class RedisException extends RuntimeException { + + private static final long serialVersionUID = 3389820652701696154L; + + public RedisException(String message, Throwable cause) { + super(message, cause); + } + + public RedisException(String message) { + super(message); + } + +} diff --git a/src/main/java/org/redisson/client/handler/RedisCommandsQueue.java b/src/main/java/org/redisson/client/handler/RedisCommandsQueue.java index bf4f6fbb8..d9e6b8b1a 100644 --- a/src/main/java/org/redisson/client/handler/RedisCommandsQueue.java +++ b/src/main/java/org/redisson/client/handler/RedisCommandsQueue.java @@ -28,9 +28,9 @@ public class RedisCommandsQueue extends ChannelDuplexHandler { public enum QueueCommands {NEXT_COMMAND} - public static final AttributeKey> REPLAY_PROMISE = AttributeKey.valueOf("promise"); + public static final AttributeKey> REPLAY_PROMISE = AttributeKey.valueOf("promise"); - private final Queue> queue = PlatformDependent.newMpscQueue(); + private final Queue> queue = PlatformDependent.newMpscQueue(); @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { @@ -45,8 +45,8 @@ public class RedisCommandsQueue extends ChannelDuplexHandler { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if (msg instanceof RedisData) { - RedisData data = (RedisData) msg; - if (data.getSend().get()) { + RedisData data = (RedisData) msg; + if (data.getSended().get()) { super.write(ctx, msg, promise); } else { queue.add(data); @@ -58,9 +58,9 @@ public class RedisCommandsQueue extends ChannelDuplexHandler { } private void sendData(ChannelHandlerContext ctx) throws Exception { - RedisData data = queue.peek(); - if (data != null && data.getSend().compareAndSet(false, true)) { - ctx.channel().attr(REPLAY_PROMISE).set(data.getPromise()); + RedisData data = queue.peek(); + if (data != null && data.getSended().compareAndSet(false, true)) { + ctx.channel().attr(REPLAY_PROMISE).set(data); ctx.channel().writeAndFlush(data); } } diff --git a/src/main/java/org/redisson/client/handler/RedisData.java b/src/main/java/org/redisson/client/handler/RedisData.java index 03a30552a..14e6acffa 100644 --- a/src/main/java/org/redisson/client/handler/RedisData.java +++ b/src/main/java/org/redisson/client/handler/RedisData.java @@ -17,24 +17,28 @@ package org.redisson.client.handler; import java.util.concurrent.atomic.AtomicBoolean; +import org.redisson.client.protocol.Codec; +import org.redisson.client.protocol.Encoder; import org.redisson.client.protocol.RedisCommand; import io.netty.util.concurrent.Promise; -public class RedisData { +public class RedisData { - Promise promise; - RedisCommand command; - Object[] params; - AtomicBoolean send = new AtomicBoolean(); + final Promise promise; + final RedisCommand command; + final Object[] params; + final Codec codec; + final AtomicBoolean sended = new AtomicBoolean(); - public RedisData(Promise promise, RedisCommand command, Object[] params) { + public RedisData(Promise promise, Codec encoder, RedisCommand command, Object[] params) { this.promise = promise; this.command = command; this.params = params; + this.codec = encoder; } - public RedisCommand getCommand() { + public RedisCommand getCommand() { return command; } @@ -46,8 +50,12 @@ public class RedisData { return promise; } - public AtomicBoolean getSend() { - return send; + public AtomicBoolean getSended() { + return sended; + } + + public Codec getCodec() { + return codec; } } diff --git a/src/main/java/org/redisson/client/handler/RedisDecoder.java b/src/main/java/org/redisson/client/handler/RedisDecoder.java index c6a4e5527..4d64cd23e 100644 --- a/src/main/java/org/redisson/client/handler/RedisDecoder.java +++ b/src/main/java/org/redisson/client/handler/RedisDecoder.java @@ -15,56 +15,91 @@ */ package org.redisson.client.handler; +import java.io.IOException; import java.util.List; +import org.redisson.client.RedisException; import org.redisson.client.handler.RedisCommandsQueue.QueueCommands; +import org.redisson.client.protocol.Decoder; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ReplayingDecoder; -import io.netty.util.CharsetUtil; public class RedisDecoder extends ReplayingDecoder { + private static final char CR = '\r'; + private static final char LF = '\n'; + private static final char ZERO = '0'; + @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { - int code = in.readByte(); - String status = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8); - in.skipBytes(2); - out.add(status); - System.out.println("status: " + status); + RedisData data = ctx.channel().attr(RedisCommandsQueue.REPLAY_PROMISE).getAndRemove(); - ctx.channel().attr(RedisCommandsQueue.REPLAY_PROMISE).getAndRemove().setSuccess(status); + int code = in.readByte(); + if (code == '+') { + Object result = data.getCommand().getReponseDecoder().decode(in); + data.getPromise().setSuccess(result); + } else if (code == '-') { + Object result = data.getCommand().getReponseDecoder().decode(in); + data.getPromise().setFailure(new RedisException(result.toString())); + } else if (code == '$') { + Decoder decoder = data.getCommand().getReponseDecoder(); + if (decoder == null) { + decoder = data.getCodec(); + } + Object result = decoder.decode(readBytes(in)); + data.getPromise().setSuccess(result); + } else { + throw new IllegalStateException("Can't decode replay"); + } ctx.pipeline().fireUserEventTriggered(QueueCommands.NEXT_COMMAND); -// switch (code) { -// case StatusReply.MARKER: { -// String status = is.readBytes(is.bytesBefore((byte) '\r')).toString(Charsets.UTF_8); -// is.skipBytes(2); -// return new StatusReply(status); -// } -// case ErrorReply.MARKER: { -// String error = is.readBytes(is.bytesBefore((byte) '\r')).toString(Charsets.UTF_8); -// is.skipBytes(2); -// return new ErrorReply(error); -// } -// case IntegerReply.MARKER: { -// return new IntegerReply(readLong(is)); -// } -// case BulkReply.MARKER: { -// return new BulkReply(readBytes(is)); -// } -// case MultiBulkReply.MARKER: { -// if (reply == null) { -// return decodeMultiBulkReply(is); -// } else { -// return new RedisReplyDecoder(false).decodeMultiBulkReply(is); -// } -// } -// default: { -// throw new IOException("Unexpected character in stream: " + code); -// } -// } + } + + public ByteBuf readBytes(ByteBuf is) throws IOException { + long l = readLong(is); + if (l > Integer.MAX_VALUE) { + throw new IllegalArgumentException( + "Java only supports arrays up to " + Integer.MAX_VALUE + " in size"); + } + int size = (int) l; + if (size == -1) { + return null; + } + ByteBuf buffer = is.readSlice(size); + int cr = is.readByte(); + int lf = is.readByte(); + if (cr != CR || lf != LF) { + throw new IOException("Improper line ending: " + cr + ", " + lf); + } + return buffer; + } + + public static long readLong(ByteBuf is) throws IOException { + long size = 0; + int sign = 1; + int read = is.readByte(); + if (read == '-') { + read = is.readByte(); + sign = -1; + } + do { + if (read == CR) { + if (is.readByte() == LF) { + break; + } + } + int value = read - ZERO; + if (value >= 0 && value < 10) { + size *= 10; + size += value; + } else { + throw new IOException("Invalid character in integer"); + } + read = is.readByte(); + } while (true); + return size * sign; } } diff --git a/src/main/java/org/redisson/client/handler/RedisEncoder.java b/src/main/java/org/redisson/client/handler/RedisEncoder.java index ca3b5e284..e3bcc3195 100644 --- a/src/main/java/org/redisson/client/handler/RedisEncoder.java +++ b/src/main/java/org/redisson/client/handler/RedisEncoder.java @@ -22,14 +22,14 @@ import io.netty.util.AttributeKey; import io.netty.util.CharsetUtil; import io.netty.util.concurrent.Promise; -public class RedisEncoder extends MessageToByteEncoder> { +public class RedisEncoder extends MessageToByteEncoder> { final char ARGS_PREFIX = '*'; final char BYTES_PREFIX = '$'; final byte[] CRLF = "\r\n".getBytes(); @Override - protected void encode(ChannelHandlerContext ctx, RedisData msg, ByteBuf out) throws Exception { + protected void encode(ChannelHandlerContext ctx, RedisData msg, ByteBuf out) throws Exception { out.writeByte(ARGS_PREFIX); out.writeBytes(toChars(1 + msg.getParams().length)); out.writeBytes(CRLF); @@ -39,8 +39,8 @@ public class RedisEncoder extends MessageToByteEncoder> { writeArgument(out, param.toString().getBytes("UTF-8")); } - String o = out.toString(CharsetUtil.UTF_8); - System.out.println(o); +// String o = out.toString(CharsetUtil.UTF_8); +// System.out.println(o); } private void writeArgument(ByteBuf out, byte[] arg) { diff --git a/src/main/java/org/redisson/client/protocol/Codec.java b/src/main/java/org/redisson/client/protocol/Codec.java new file mode 100644 index 000000000..6e03ed604 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/Codec.java @@ -0,0 +1,5 @@ +package org.redisson.client.protocol; + +public interface Codec extends Encoder, Decoder { + +} diff --git a/src/main/java/org/redisson/client/protocol/ObjectDecoder.java b/src/main/java/org/redisson/client/protocol/Decoder.java similarity index 87% rename from src/main/java/org/redisson/client/protocol/ObjectDecoder.java rename to src/main/java/org/redisson/client/protocol/Decoder.java index 14ae5d485..7f8f22b2c 100644 --- a/src/main/java/org/redisson/client/protocol/ObjectDecoder.java +++ b/src/main/java/org/redisson/client/protocol/Decoder.java @@ -15,6 +15,10 @@ */ package org.redisson.client.protocol; -public class ObjectDecoder implements ResponseDecoder { +import io.netty.buffer.ByteBuf; + +public interface Decoder { + + R decode(ByteBuf buf); } diff --git a/src/main/java/org/redisson/client/protocol/Encoder.java b/src/main/java/org/redisson/client/protocol/Encoder.java new file mode 100644 index 000000000..5920e5109 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/Encoder.java @@ -0,0 +1,7 @@ +package org.redisson.client.protocol; + +public interface Encoder { + + Object encode(Object in); + +} diff --git a/src/main/java/org/redisson/client/protocol/RedisCommand.java b/src/main/java/org/redisson/client/protocol/RedisCommand.java index f381d6d78..25c6bfb1b 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommand.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommand.java @@ -17,10 +17,14 @@ package org.redisson.client.protocol; public class RedisCommand { - private String name; - private ResponseDecoder reponseDecoder; + private final String name; + private Decoder reponseDecoder; - public RedisCommand(String name, ResponseDecoder reponseDecoder) { + public RedisCommand(String name) { + this(name, null); + } + + public RedisCommand(String name, Decoder reponseDecoder) { super(); this.name = name; this.reponseDecoder = reponseDecoder; @@ -30,7 +34,7 @@ public class RedisCommand { return name; } - public ResponseDecoder getReponseDecoder() { + public Decoder getReponseDecoder() { return reponseDecoder; } diff --git a/src/main/java/org/redisson/client/protocol/RedisCommands.java b/src/main/java/org/redisson/client/protocol/RedisCommands.java index 93a6b0f9a..6bcc6b21a 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -17,7 +17,7 @@ package org.redisson.client.protocol; public interface RedisCommands { - RedisCommand GET = new RedisCommand("GET", new ObjectDecoder()); - RedisCommand SET = new RedisCommand("SET", new StringDecoder()); + RedisCommand GET = new RedisCommand("GET"); + RedisCommand SET = new RedisCommand("SET", new StringReplayDecoder()); } diff --git a/src/main/java/org/redisson/client/protocol/StringCodec.java b/src/main/java/org/redisson/client/protocol/StringCodec.java new file mode 100644 index 000000000..20dc7709d --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/StringCodec.java @@ -0,0 +1,18 @@ +package org.redisson.client.protocol; + +import io.netty.buffer.ByteBuf; +import io.netty.util.CharsetUtil; + +public class StringCodec implements Codec { + + @Override + public Object encode(Object in) { + return in.toString(); + } + + @Override + public Object decode(ByteBuf buf) { + return buf.toString(CharsetUtil.UTF_8); + } + +} diff --git a/src/main/java/org/redisson/client/protocol/StringDecoder.java b/src/main/java/org/redisson/client/protocol/StringDecoder.java deleted file mode 100644 index 0f9ea8a40..000000000 --- a/src/main/java/org/redisson/client/protocol/StringDecoder.java +++ /dev/null @@ -1,20 +0,0 @@ -/** - * Copyright 2014 Nikita Koksharov, Nickolay Borbit - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.redisson.client.protocol; - -public class StringDecoder implements ResponseDecoder { - -} diff --git a/src/main/java/org/redisson/client/protocol/ResponseDecoder.java b/src/main/java/org/redisson/client/protocol/StringReplayDecoder.java similarity index 66% rename from src/main/java/org/redisson/client/protocol/ResponseDecoder.java rename to src/main/java/org/redisson/client/protocol/StringReplayDecoder.java index 2ad1f8289..05d841794 100644 --- a/src/main/java/org/redisson/client/protocol/ResponseDecoder.java +++ b/src/main/java/org/redisson/client/protocol/StringReplayDecoder.java @@ -15,6 +15,16 @@ */ package org.redisson.client.protocol; -public interface ResponseDecoder { +import io.netty.buffer.ByteBuf; +import io.netty.util.CharsetUtil; + +public class StringReplayDecoder implements Decoder { + + @Override + public String decode(ByteBuf buf) { + String status = buf.readBytes(buf.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8); + buf.skipBytes(2); + return status; + } }