|
|
|
@ -15,56 +15,91 @@
|
|
|
|
|
*/
|
|
|
|
|
package org.redisson.client.handler;
|
|
|
|
|
|
|
|
|
|
import java.io.IOException;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
|
|
|
|
|
import org.redisson.client.RedisException;
|
|
|
|
|
import org.redisson.client.handler.RedisCommandsQueue.QueueCommands;
|
|
|
|
|
import org.redisson.client.protocol.Decoder;
|
|
|
|
|
|
|
|
|
|
import io.netty.buffer.ByteBuf;
|
|
|
|
|
import io.netty.channel.ChannelHandlerContext;
|
|
|
|
|
import io.netty.handler.codec.ReplayingDecoder;
|
|
|
|
|
import io.netty.util.CharsetUtil;
|
|
|
|
|
|
|
|
|
|
public class RedisDecoder extends ReplayingDecoder<Void> {
|
|
|
|
|
|
|
|
|
|
private static final char CR = '\r';
|
|
|
|
|
private static final char LF = '\n';
|
|
|
|
|
private static final char ZERO = '0';
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
|
|
|
|
|
int code = in.readByte();
|
|
|
|
|
String status = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8);
|
|
|
|
|
in.skipBytes(2);
|
|
|
|
|
out.add(status);
|
|
|
|
|
System.out.println("status: " + status);
|
|
|
|
|
RedisData<Object, Object> data = ctx.channel().attr(RedisCommandsQueue.REPLAY_PROMISE).getAndRemove();
|
|
|
|
|
|
|
|
|
|
ctx.channel().attr(RedisCommandsQueue.REPLAY_PROMISE).getAndRemove().setSuccess(status);
|
|
|
|
|
int code = in.readByte();
|
|
|
|
|
if (code == '+') {
|
|
|
|
|
Object result = data.getCommand().getReponseDecoder().decode(in);
|
|
|
|
|
data.getPromise().setSuccess(result);
|
|
|
|
|
} else if (code == '-') {
|
|
|
|
|
Object result = data.getCommand().getReponseDecoder().decode(in);
|
|
|
|
|
data.getPromise().setFailure(new RedisException(result.toString()));
|
|
|
|
|
} else if (code == '$') {
|
|
|
|
|
Decoder<Object> decoder = data.getCommand().getReponseDecoder();
|
|
|
|
|
if (decoder == null) {
|
|
|
|
|
decoder = data.getCodec();
|
|
|
|
|
}
|
|
|
|
|
Object result = decoder.decode(readBytes(in));
|
|
|
|
|
data.getPromise().setSuccess(result);
|
|
|
|
|
} else {
|
|
|
|
|
throw new IllegalStateException("Can't decode replay");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ctx.pipeline().fireUserEventTriggered(QueueCommands.NEXT_COMMAND);
|
|
|
|
|
// switch (code) {
|
|
|
|
|
// case StatusReply.MARKER: {
|
|
|
|
|
// String status = is.readBytes(is.bytesBefore((byte) '\r')).toString(Charsets.UTF_8);
|
|
|
|
|
// is.skipBytes(2);
|
|
|
|
|
// return new StatusReply(status);
|
|
|
|
|
// }
|
|
|
|
|
// case ErrorReply.MARKER: {
|
|
|
|
|
// String error = is.readBytes(is.bytesBefore((byte) '\r')).toString(Charsets.UTF_8);
|
|
|
|
|
// is.skipBytes(2);
|
|
|
|
|
// return new ErrorReply(error);
|
|
|
|
|
// }
|
|
|
|
|
// case IntegerReply.MARKER: {
|
|
|
|
|
// return new IntegerReply(readLong(is));
|
|
|
|
|
// }
|
|
|
|
|
// case BulkReply.MARKER: {
|
|
|
|
|
// return new BulkReply(readBytes(is));
|
|
|
|
|
// }
|
|
|
|
|
// case MultiBulkReply.MARKER: {
|
|
|
|
|
// if (reply == null) {
|
|
|
|
|
// return decodeMultiBulkReply(is);
|
|
|
|
|
// } else {
|
|
|
|
|
// return new RedisReplyDecoder(false).decodeMultiBulkReply(is);
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
// default: {
|
|
|
|
|
// throw new IOException("Unexpected character in stream: " + code);
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public ByteBuf readBytes(ByteBuf is) throws IOException {
|
|
|
|
|
long l = readLong(is);
|
|
|
|
|
if (l > Integer.MAX_VALUE) {
|
|
|
|
|
throw new IllegalArgumentException(
|
|
|
|
|
"Java only supports arrays up to " + Integer.MAX_VALUE + " in size");
|
|
|
|
|
}
|
|
|
|
|
int size = (int) l;
|
|
|
|
|
if (size == -1) {
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
ByteBuf buffer = is.readSlice(size);
|
|
|
|
|
int cr = is.readByte();
|
|
|
|
|
int lf = is.readByte();
|
|
|
|
|
if (cr != CR || lf != LF) {
|
|
|
|
|
throw new IOException("Improper line ending: " + cr + ", " + lf);
|
|
|
|
|
}
|
|
|
|
|
return buffer;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public static long readLong(ByteBuf is) throws IOException {
|
|
|
|
|
long size = 0;
|
|
|
|
|
int sign = 1;
|
|
|
|
|
int read = is.readByte();
|
|
|
|
|
if (read == '-') {
|
|
|
|
|
read = is.readByte();
|
|
|
|
|
sign = -1;
|
|
|
|
|
}
|
|
|
|
|
do {
|
|
|
|
|
if (read == CR) {
|
|
|
|
|
if (is.readByte() == LF) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
int value = read - ZERO;
|
|
|
|
|
if (value >= 0 && value < 10) {
|
|
|
|
|
size *= 10;
|
|
|
|
|
size += value;
|
|
|
|
|
} else {
|
|
|
|
|
throw new IOException("Invalid character in integer");
|
|
|
|
|
}
|
|
|
|
|
read = is.readByte();
|
|
|
|
|
} while (true);
|
|
|
|
|
return size * sign;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|