diff --git a/src/main/java/org/redisson/client/RedisClient.java b/src/main/java/org/redisson/client/RedisClient.java new file mode 100644 index 000000000..85326892a --- /dev/null +++ b/src/main/java/org/redisson/client/RedisClient.java @@ -0,0 +1,92 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client; + +import java.net.InetSocketAddress; + +import org.redisson.client.handler.RedisCommandsQueue; +import org.redisson.client.handler.RedisData; +import org.redisson.client.handler.RedisDecoder; +import org.redisson.client.handler.RedisEncoder; +import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisCommands; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.Promise; + +public class RedisClient { + + private Class socketChannelClass = NioSocketChannel.class; + private Bootstrap bootstrap; + private EventLoopGroup group = new NioEventLoopGroup(); + private InetSocketAddress addr; + private Channel channel; + + public RedisClient(String host, int port) { + addr = new InetSocketAddress(host, port); + bootstrap = new Bootstrap().channel(socketChannelClass).group(group).remoteAddress(addr); + bootstrap.handler(new ChannelInitializer() { + + @Override + protected void initChannel(Channel ch) throws Exception { + ch.pipeline().addFirst( + new RedisEncoder(), + new RedisCommandsQueue(), + new RedisDecoder()); + } + + }); + } + + public ChannelFuture connect() { + ChannelFuture future = bootstrap.connect(); + channel = future.channel(); + return future; + } + + public Future execute(RedisCommand command, Object ... params) { + Promise promise = bootstrap.group().next().newPromise(); + channel.writeAndFlush(new RedisData(promise, command, params)); + return promise; + } + + public static void main(String[] args) throws InterruptedException { + RedisClient rc = new RedisClient("127.0.0.1", 6379); + rc.connect().sync(); + for (int i = 0; i < 10000; i++) { + final int j = i; + Future res = rc.execute(RedisCommands.SET, "test", "" + Math.random()); + res.addListener(new FutureListener() { + + @Override + public void operationComplete(Future future) throws Exception { + System.out.println("res: " + future.getNow() + " " + j); + } + + }); + } +// rc.execute(RedisCommands.GET, "test"); + } +} diff --git a/src/main/java/org/redisson/client/handler/RedisCommandsQueue.java b/src/main/java/org/redisson/client/handler/RedisCommandsQueue.java new file mode 100644 index 000000000..bf4f6fbb8 --- /dev/null +++ b/src/main/java/org/redisson/client/handler/RedisCommandsQueue.java @@ -0,0 +1,68 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.handler; + +import java.util.Queue; + +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.util.AttributeKey; +import io.netty.util.concurrent.Promise; +import io.netty.util.internal.PlatformDependent; + +public class RedisCommandsQueue extends ChannelDuplexHandler { + + public enum QueueCommands {NEXT_COMMAND} + + public static final AttributeKey> REPLAY_PROMISE = AttributeKey.valueOf("promise"); + + private final Queue> queue = PlatformDependent.newMpscQueue(); + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt == QueueCommands.NEXT_COMMAND) { + queue.poll(); + sendData(ctx); + } else { + super.userEventTriggered(ctx, evt); + } + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + if (msg instanceof RedisData) { + RedisData data = (RedisData) msg; + if (data.getSend().get()) { + super.write(ctx, msg, promise); + } else { + queue.add(data); + sendData(ctx); + } + } else { + super.write(ctx, msg, promise); + } + } + + private void sendData(ChannelHandlerContext ctx) throws Exception { + RedisData data = queue.peek(); + if (data != null && data.getSend().compareAndSet(false, true)) { + ctx.channel().attr(REPLAY_PROMISE).set(data.getPromise()); + ctx.channel().writeAndFlush(data); + } + } + +} diff --git a/src/main/java/org/redisson/client/handler/RedisData.java b/src/main/java/org/redisson/client/handler/RedisData.java new file mode 100644 index 000000000..03a30552a --- /dev/null +++ b/src/main/java/org/redisson/client/handler/RedisData.java @@ -0,0 +1,53 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.handler; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.redisson.client.protocol.RedisCommand; + +import io.netty.util.concurrent.Promise; + +public class RedisData { + + Promise promise; + RedisCommand command; + Object[] params; + AtomicBoolean send = new AtomicBoolean(); + + public RedisData(Promise promise, RedisCommand command, Object[] params) { + this.promise = promise; + this.command = command; + this.params = params; + } + + public RedisCommand getCommand() { + return command; + } + + public Object[] getParams() { + return params; + } + + public Promise getPromise() { + return promise; + } + + public AtomicBoolean getSend() { + return send; + } + +} diff --git a/src/main/java/org/redisson/client/handler/RedisDecoder.java b/src/main/java/org/redisson/client/handler/RedisDecoder.java new file mode 100644 index 000000000..c6a4e5527 --- /dev/null +++ b/src/main/java/org/redisson/client/handler/RedisDecoder.java @@ -0,0 +1,70 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.handler; + +import java.util.List; + +import org.redisson.client.handler.RedisCommandsQueue.QueueCommands; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ReplayingDecoder; +import io.netty.util.CharsetUtil; + +public class RedisDecoder extends ReplayingDecoder { + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + int code = in.readByte(); + String status = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8); + in.skipBytes(2); + out.add(status); + System.out.println("status: " + status); + + ctx.channel().attr(RedisCommandsQueue.REPLAY_PROMISE).getAndRemove().setSuccess(status); + + ctx.pipeline().fireUserEventTriggered(QueueCommands.NEXT_COMMAND); +// switch (code) { +// case StatusReply.MARKER: { +// String status = is.readBytes(is.bytesBefore((byte) '\r')).toString(Charsets.UTF_8); +// is.skipBytes(2); +// return new StatusReply(status); +// } +// case ErrorReply.MARKER: { +// String error = is.readBytes(is.bytesBefore((byte) '\r')).toString(Charsets.UTF_8); +// is.skipBytes(2); +// return new ErrorReply(error); +// } +// case IntegerReply.MARKER: { +// return new IntegerReply(readLong(is)); +// } +// case BulkReply.MARKER: { +// return new BulkReply(readBytes(is)); +// } +// case MultiBulkReply.MARKER: { +// if (reply == null) { +// return decodeMultiBulkReply(is); +// } else { +// return new RedisReplyDecoder(false).decodeMultiBulkReply(is); +// } +// } +// default: { +// throw new IOException("Unexpected character in stream: " + code); +// } +// } + } + +} diff --git a/src/main/java/org/redisson/client/handler/RedisEncoder.java b/src/main/java/org/redisson/client/handler/RedisEncoder.java new file mode 100644 index 000000000..ca3b5e284 --- /dev/null +++ b/src/main/java/org/redisson/client/handler/RedisEncoder.java @@ -0,0 +1,125 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.handler; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; +import io.netty.util.AttributeKey; +import io.netty.util.CharsetUtil; +import io.netty.util.concurrent.Promise; + +public class RedisEncoder extends MessageToByteEncoder> { + + final char ARGS_PREFIX = '*'; + final char BYTES_PREFIX = '$'; + final byte[] CRLF = "\r\n".getBytes(); + + @Override + protected void encode(ChannelHandlerContext ctx, RedisData msg, ByteBuf out) throws Exception { + out.writeByte(ARGS_PREFIX); + out.writeBytes(toChars(1 + msg.getParams().length)); + out.writeBytes(CRLF); + + writeArgument(out, msg.getCommand().getName().getBytes("UTF-8")); + for (Object param : msg.getParams()) { + writeArgument(out, param.toString().getBytes("UTF-8")); + } + + String o = out.toString(CharsetUtil.UTF_8); + System.out.println(o); + } + + private void writeArgument(ByteBuf out, byte[] arg) { + out.writeByte(BYTES_PREFIX); + out.writeBytes(toChars(arg.length)); + out.writeBytes(CRLF); + out.writeBytes(arg); + out.writeBytes(CRLF); + } + + final static char[] DigitTens = {'0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '1', '1', '1', '1', + '1', '1', '1', '1', '1', '1', '2', '2', '2', '2', '2', '2', '2', '2', '2', '2', '3', '3', '3', + '3', '3', '3', '3', '3', '3', '3', '4', '4', '4', '4', '4', '4', '4', '4', '4', '4', '5', '5', + '5', '5', '5', '5', '5', '5', '5', '5', '6', '6', '6', '6', '6', '6', '6', '6', '6', '6', '7', + '7', '7', '7', '7', '7', '7', '7', '7', '7', '8', '8', '8', '8', '8', '8', '8', '8', '8', '8', + '9', '9', '9', '9', '9', '9', '9', '9', '9', '9',}; + + final static char[] DigitOnes = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', + '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', + '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', + '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', + '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',}; + + final static char[] digits = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', + 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', + 'y', 'z'}; + + final static int[] sizeTable = {9, 99, 999, 9999, 99999, 999999, 9999999, 99999999, 999999999, + Integer.MAX_VALUE}; + + // Requires positive x + static int stringSize(long x) { + for (int i = 0;; i++) + if (x <= sizeTable[i]) + return i + 1; + } + + static void getChars(long i, int index, byte[] buf) { + long q, r; + int charPos = index; + byte sign = 0; + + if (i < 0) { + sign = '-'; + i = -i; + } + + // Generate two digits per iteration + while (i >= 65536) { + q = i / 100; + // really: r = i - (q * 100); + r = i - ((q << 6) + (q << 5) + (q << 2)); + i = q; + buf[--charPos] = (byte) DigitOnes[(int)r]; + buf[--charPos] = (byte) DigitTens[(int)r]; + } + + // Fall thru to fast mode for smaller numbers + // assert(i <= 65536, i); + for (;;) { + q = (i * 52429) >>> (16 + 3); + r = i - ((q << 3) + (q << 1)); // r = i-(q*10) ... + buf[--charPos] = (byte) digits[(int)r]; + i = q; + if (i == 0) + break; + } + if (sign != 0) { + buf[--charPos] = sign; + } + } + + public static byte[] toChars(long i) { + int size = (i < 0) ? stringSize(-i) + 1 : stringSize(i); + byte[] buf = new byte[size]; + getChars(i, size, buf); + return buf; + } + + +} diff --git a/src/main/java/org/redisson/client/protocol/ObjectDecoder.java b/src/main/java/org/redisson/client/protocol/ObjectDecoder.java new file mode 100644 index 000000000..14ae5d485 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/ObjectDecoder.java @@ -0,0 +1,20 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol; + +public class ObjectDecoder implements ResponseDecoder { + +} diff --git a/src/main/java/org/redisson/client/protocol/RedisCommand.java b/src/main/java/org/redisson/client/protocol/RedisCommand.java new file mode 100644 index 000000000..f381d6d78 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/RedisCommand.java @@ -0,0 +1,38 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol; + +public class RedisCommand { + + private String name; + private ResponseDecoder reponseDecoder; + + public RedisCommand(String name, ResponseDecoder reponseDecoder) { + super(); + this.name = name; + this.reponseDecoder = reponseDecoder; + } + + public String getName() { + return name; + } + + public ResponseDecoder getReponseDecoder() { + return reponseDecoder; + } + + +} diff --git a/src/main/java/org/redisson/client/protocol/RedisCommands.java b/src/main/java/org/redisson/client/protocol/RedisCommands.java new file mode 100644 index 000000000..93a6b0f9a --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -0,0 +1,23 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol; + +public interface RedisCommands { + + RedisCommand GET = new RedisCommand("GET", new ObjectDecoder()); + RedisCommand SET = new RedisCommand("SET", new StringDecoder()); + +} diff --git a/src/main/java/org/redisson/client/protocol/ResponseDecoder.java b/src/main/java/org/redisson/client/protocol/ResponseDecoder.java new file mode 100644 index 000000000..2ad1f8289 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/ResponseDecoder.java @@ -0,0 +1,20 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol; + +public interface ResponseDecoder { + +} diff --git a/src/main/java/org/redisson/client/protocol/StringDecoder.java b/src/main/java/org/redisson/client/protocol/StringDecoder.java new file mode 100644 index 000000000..0f9ea8a40 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/StringDecoder.java @@ -0,0 +1,20 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol; + +public class StringDecoder implements ResponseDecoder { + +}