diff --git a/src/main/java/org/redisson/RedissonDeque.java b/src/main/java/org/redisson/RedissonDeque.java index 234be985a..07aa169ce 100644 --- a/src/main/java/org/redisson/RedissonDeque.java +++ b/src/main/java/org/redisson/RedissonDeque.java @@ -43,7 +43,7 @@ public class RedissonDeque extends RedissonQueue implements RDeque { @Override public void addFirst(final V e) { - connectionManager.write(new VoidOperation() { + connectionManager.write(getName(), new VoidOperation() { @Override protected Future execute(RedisAsyncConnection async) { return async.lpush(getName(), e); @@ -53,7 +53,7 @@ public class RedissonDeque extends RedissonQueue implements RDeque { @Override public void addLast(final V e) { - connectionManager.write(new VoidOperation() { + connectionManager.write(getName(), new VoidOperation() { @Override protected Future execute(RedisAsyncConnection async) { return async.rpush(getName(), e); @@ -99,7 +99,7 @@ public class RedissonDeque extends RedissonQueue implements RDeque { @Override public V getLast() { - List list = connectionManager.read(new ResultOperation, V>() { + List list = connectionManager.read(getName(), new ResultOperation, V>() { @Override protected Future> execute(RedisAsyncConnection async) { return async.lrange(getName(), -1, -1); @@ -113,7 +113,7 @@ public class RedissonDeque extends RedissonQueue implements RDeque { @Override public boolean offerFirst(final V e) { - connectionManager.write(new ResultOperation() { + connectionManager.write(getName(), new ResultOperation() { @Override protected Future execute(RedisAsyncConnection async) { return async.lpush(getName(), e); @@ -134,7 +134,7 @@ public class RedissonDeque extends RedissonQueue implements RDeque { @Override public V peekLast() { - List list = connectionManager.read(new ResultOperation, V>() { + List list = connectionManager.read(getName(), new ResultOperation, V>() { @Override protected Future> execute(RedisAsyncConnection async) { return async.lrange(getName(), -1, -1); @@ -153,7 +153,7 @@ public class RedissonDeque extends RedissonQueue implements RDeque { @Override public V pollLast() { - return connectionManager.write(new ResultOperation() { + return connectionManager.write(getName(), new ResultOperation() { @Override protected Future execute(RedisAsyncConnection async) { return async.rpop(getName()); @@ -178,7 +178,7 @@ public class RedissonDeque extends RedissonQueue implements RDeque { @Override public V removeLast() { - V value = connectionManager.write(new ResultOperation() { + V value = connectionManager.write(getName(), new ResultOperation() { @Override protected Future execute(RedisAsyncConnection async) { return async.rpop(getName()); diff --git a/src/main/java/org/redisson/RedissonExpirable.java b/src/main/java/org/redisson/RedissonExpirable.java index d864d1a20..3fbc7842c 100644 --- a/src/main/java/org/redisson/RedissonExpirable.java +++ b/src/main/java/org/redisson/RedissonExpirable.java @@ -74,7 +74,7 @@ abstract class RedissonExpirable extends RedissonObject implements RExpirable { @Override public long remainTimeToLive() { - return connectionManager.write(new ResultOperation() { + return connectionManager.write(getName(), new ResultOperation() { @Override protected Future execute(RedisAsyncConnection async) { return async.ttl(getName()); diff --git a/src/main/java/org/redisson/RedissonLock.java b/src/main/java/org/redisson/RedissonLock.java index 23bf1c072..4d1221f51 100644 --- a/src/main/java/org/redisson/RedissonLock.java +++ b/src/main/java/org/redisson/RedissonLock.java @@ -393,7 +393,7 @@ public class RedissonLock extends RedissonExpirable implements RLock { @Override public boolean isLocked() { - return connectionManager.read(new SyncOperation() { + return connectionManager.read(getName(), new SyncOperation() { @Override public Boolean execute(RedisConnection conn) { return conn.exists(getName()); diff --git a/src/main/java/org/redisson/RedissonQueue.java b/src/main/java/org/redisson/RedissonQueue.java index e6f36a830..f4b9cb072 100644 --- a/src/main/java/org/redisson/RedissonQueue.java +++ b/src/main/java/org/redisson/RedissonQueue.java @@ -94,7 +94,7 @@ public class RedissonQueue extends RedissonList implements RQueue { @Override public V pollLastAndOfferFirstTo(final String queueName) { - return connectionManager.write(new ResultOperation() { + return connectionManager.write(getName(), new ResultOperation() { @Override protected Future execute(RedisAsyncConnection async) { return async.rpoplpush(getName(), queueName); diff --git a/src/main/java/org/redisson/client/RedisClient.java b/src/main/java/org/redisson/client/RedisClient.java index 67e664358..658adf055 100644 --- a/src/main/java/org/redisson/client/RedisClient.java +++ b/src/main/java/org/redisson/client/RedisClient.java @@ -16,12 +16,12 @@ package org.redisson.client; import java.net.InetSocketAddress; +import java.util.concurrent.ExecutionException; import org.redisson.client.handler.RedisCommandsQueue; import org.redisson.client.handler.RedisDecoder; import org.redisson.client.handler.RedisEncoder; -import org.redisson.client.protocol.RedisCommands; -import org.redisson.client.protocol.StringCodec; +import org.redisson.client.protocol.PubSubMessage; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; @@ -35,6 +35,7 @@ import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GlobalEventExecutor; public class RedisClient { @@ -56,10 +57,9 @@ public class RedisClient { @Override protected void initChannel(Channel ch) throws Exception { - ch.pipeline().addFirst( - new RedisEncoder(), - new RedisCommandsQueue(), - new RedisDecoder()); + ch.pipeline().addFirst(new RedisEncoder(), + new RedisCommandsQueue(), + new RedisDecoder()); } }); @@ -87,20 +87,34 @@ public class RedisClient { return new RedisConnection(this, future.channel()); } + public RedisPubSubConnection connectPubSub() { + ChannelFuture future = bootstrap.connect(); + future.syncUninterruptibly(); + channels.add(future.channel()); + return new RedisPubSubConnection(this, future.channel()); + } + public ChannelGroupFuture shutdownAsync() { return channels.close(); } - public static void main(String[] args) throws InterruptedException { + public static void main(String[] args) throws InterruptedException, ExecutionException { final RedisClient c = new RedisClient("127.0.0.1", 6379); RedisConnection rc = c.connect(); -// for (int i = 0; i < 10000; i++) { - String res1 = rc.sync(RedisCommands.CLIENT_SETNAME, "12333"); - System.out.println("res 12: " + res1); - String res2 = rc.sync(RedisCommands.CLIENT_GETNAME); - System.out.println("res name: " + res2); - Boolean res3 = rc.sync(new StringCodec(), RedisCommands.EXISTS, "33"); - System.out.println("res name 2: " + res3); + RedisPubSubConnection rpsc = c.connectPubSub(); + +// String res1 = rc.sync(RedisCommands.CLIENT_SETNAME, "12333"); +// System.out.println("res 12: " + res1); +// String res2 = rc.sync(RedisCommands.CLIENT_GETNAME); +// System.out.println("res name: " + res2); +// Boolean res3 = rc.sync(new StringCodec(), RedisCommands.EXISTS, "33"); +// System.out.println("res name 2: " + res3); + + Future m = rpsc.publish("sss", "123"); + System.out.println("out: " + m.get()); + Future m1 = rpsc.subscribe("sss"); + System.out.println("out: " + m1.get()); + /* Future res = rc.execute(new StringCodec(), RedisCommands.SET, "test", "" + Math.random()); diff --git a/src/main/java/org/redisson/client/RedisPubSubConnection.java b/src/main/java/org/redisson/client/RedisPubSubConnection.java new file mode 100644 index 000000000..81ed41a86 --- /dev/null +++ b/src/main/java/org/redisson/client/RedisPubSubConnection.java @@ -0,0 +1,44 @@ +package org.redisson.client; + +import org.redisson.client.handler.RedisData; +import org.redisson.client.protocol.Codec; +import org.redisson.client.protocol.PubSubMessage; +import org.redisson.client.protocol.PubSubMessageDecoder; +import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.client.protocol.StringCodec; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.Promise; + +public class RedisPubSubConnection { + + final Channel channel; + final RedisClient redisClient; + + public RedisPubSubConnection(RedisClient redisClient, Channel channel) { + this.redisClient = redisClient; + this.channel = channel; + } + + public Future subscribe(String ... channel) { + return async(new PubSubMessageDecoder(), RedisCommands.SUBSCRIBE, channel); + } + + public Future publish(String channel, String msg) { + return async(new StringCodec(), RedisCommands.PUBLISH, channel, msg); + } + + public Future async(Codec encoder, RedisCommand command, Object ... params) { + Promise promise = redisClient.getBootstrap().group().next().newPromise(); + channel.writeAndFlush(new RedisData(promise, encoder, command, params)); + return promise; + } + + public ChannelFuture closeAsync() { + return channel.close(); + } + +} diff --git a/src/main/java/org/redisson/client/handler/RedisDecoder.java b/src/main/java/org/redisson/client/handler/RedisDecoder.java index 5b822df7d..5785b3d09 100644 --- a/src/main/java/org/redisson/client/handler/RedisDecoder.java +++ b/src/main/java/org/redisson/client/handler/RedisDecoder.java @@ -16,6 +16,7 @@ package org.redisson.client.handler; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import org.redisson.client.RedisException; @@ -25,39 +26,71 @@ import org.redisson.client.protocol.Decoder; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ReplayingDecoder; +import io.netty.util.CharsetUtil; public class RedisDecoder extends ReplayingDecoder { - private static final char CR = '\r'; - private static final char LF = '\n'; + public static final char CR = '\r'; + public static final char LF = '\n'; private static final char ZERO = '0'; @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { RedisData data = ctx.channel().attr(RedisCommandsQueue.REPLAY_PROMISE).getAndRemove(); + decode(in, data, null); + + ctx.pipeline().fireUserEventTriggered(QueueCommands.NEXT_COMMAND); + } + + private void decode(ByteBuf in, RedisData data, List parts) throws IOException { int code = in.readByte(); if (code == '+') { Object result = data.getCommand().getReponseDecoder().decode(in); - data.getPromise().setSuccess(result); + if (parts != null) { + parts.add(result); + } else { + data.getPromise().setSuccess(result); + } } else if (code == '-') { Object result = data.getCommand().getReponseDecoder().decode(in); data.getPromise().setFailure(new RedisException(result.toString())); } else if (code == ':') { - Object result = data.getCommand().getReponseDecoder().decode(in); - data.getPromise().setSuccess(result); + String status = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8); + in.skipBytes(2); + Long result = Long.valueOf(status); + if (parts != null) { + parts.add(result); + } else { + data.getPromise().setSuccess(result); + } } else if (code == '$') { Decoder decoder = data.getCommand().getReponseDecoder(); if (decoder == null) { decoder = data.getCodec(); } Object result = decoder.decode(readBytes(in)); + if (parts != null) { + parts.add(result); + } else { + data.getPromise().setSuccess(result); + } + } else if (code == '*') { + long size = readLong(in); + List respParts = new ArrayList(); + for (int i = 0; i < size; i++) { + decode(in, data, respParts); + } + + Decoder decoder = data.getCommand().getReponseDecoder(); + if (decoder == null) { + decoder = data.getCodec(); + } + Object result = decoder.decode(respParts); data.getPromise().setSuccess(result); } else { throw new IllegalStateException("Can't decode replay " + (char)code); } - - ctx.pipeline().fireUserEventTriggered(QueueCommands.NEXT_COMMAND); } public ByteBuf readBytes(ByteBuf is) throws IOException { diff --git a/src/main/java/org/redisson/client/protocol/BooleanReplayDecoder.java b/src/main/java/org/redisson/client/protocol/BooleanReplayDecoder.java index a0b7cb92b..4cf511022 100644 --- a/src/main/java/org/redisson/client/protocol/BooleanReplayDecoder.java +++ b/src/main/java/org/redisson/client/protocol/BooleanReplayDecoder.java @@ -15,6 +15,8 @@ */ package org.redisson.client.protocol; +import java.util.List; + import io.netty.buffer.ByteBuf; import io.netty.util.CharsetUtil; @@ -27,4 +29,9 @@ public class BooleanReplayDecoder implements Decoder { return Boolean.valueOf(status); } + @Override + public Boolean decode(List parts) { + throw new IllegalStateException(); + } + } diff --git a/src/main/java/org/redisson/client/protocol/Decoder.java b/src/main/java/org/redisson/client/protocol/Decoder.java index 7f8f22b2c..7f1f672fc 100644 --- a/src/main/java/org/redisson/client/protocol/Decoder.java +++ b/src/main/java/org/redisson/client/protocol/Decoder.java @@ -15,10 +15,14 @@ */ package org.redisson.client.protocol; +import java.util.List; + import io.netty.buffer.ByteBuf; public interface Decoder { R decode(ByteBuf buf); + R decode(List parts); + } diff --git a/src/main/java/org/redisson/client/protocol/PubSubMessage.java b/src/main/java/org/redisson/client/protocol/PubSubMessage.java new file mode 100644 index 000000000..e8aaefbf3 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/PubSubMessage.java @@ -0,0 +1,29 @@ +package org.redisson.client.protocol; + +public class PubSubMessage { + + public enum Type {SUBSCRIBE, MESSAGE} + + private Type type; + private String channel; + + public PubSubMessage(Type type, String channel) { + super(); + this.type = type; + this.channel = channel; + } + + public String getChannel() { + return channel; + } + + public Type getType() { + return type; + } + + @Override + public String toString() { + return "PubSubReplay [type=" + type + ", channel=" + channel + "]"; + } + +} diff --git a/src/main/java/org/redisson/client/protocol/PubSubMessageDecoder.java b/src/main/java/org/redisson/client/protocol/PubSubMessageDecoder.java new file mode 100644 index 000000000..889f876f9 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/PubSubMessageDecoder.java @@ -0,0 +1,47 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol; + +import java.io.UnsupportedEncodingException; +import java.util.List; + +import io.netty.buffer.ByteBuf; +import io.netty.util.CharsetUtil; + +public class PubSubMessageDecoder implements Codec { + + @Override + public String decode(ByteBuf buf) { + String status = buf.toString(CharsetUtil.UTF_8); + buf.skipBytes(2); + return status; + } + + @Override + public PubSubMessage decode(List parts) { + return new PubSubMessage(PubSubMessage.Type.valueOf(parts.get(0).toString().toUpperCase()), parts.get(1).toString()); + } + + @Override + public byte[] encode(int paramIndex, Object in) { + try { + return in.toString().getBytes("UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new IllegalStateException(e); + } + } + +} diff --git a/src/main/java/org/redisson/client/protocol/RedisCommands.java b/src/main/java/org/redisson/client/protocol/RedisCommands.java index a6c5d755a..2c85bf2a6 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -15,10 +15,6 @@ */ package org.redisson.client.protocol; -import org.redisson.client.handler.RedisData; - -import io.netty.util.concurrent.Future; - public interface RedisCommands { RedisStringCommand AUTH = new RedisStringCommand("AUTH", new StringReplayDecoder()); @@ -31,16 +27,7 @@ public interface RedisCommands { RedisCommand SETEX = new RedisCommand("SETEX", new StringReplayDecoder(), 2); RedisCommand EXISTS = new RedisCommand("EXISTS", new BooleanReplayDecoder(), 1); - - - String sync(RedisStringCommand command, Object ... params); - - Future async(RedisStringCommand command, Object ... params); - - R sync(Codec encoder, RedisCommand command, Object ... params); - - Future async(Codec encoder, RedisCommand command, Object ... params); - - void send(RedisData data); + RedisCommand PUBLISH = new RedisCommand("PUBLISH", 1); + RedisCommand SUBSCRIBE = new RedisCommand("SUBSCRIBE", 1); } diff --git a/src/main/java/org/redisson/client/protocol/StringCodec.java b/src/main/java/org/redisson/client/protocol/StringCodec.java index 4db8fa9c6..3554427c9 100644 --- a/src/main/java/org/redisson/client/protocol/StringCodec.java +++ b/src/main/java/org/redisson/client/protocol/StringCodec.java @@ -1,6 +1,7 @@ package org.redisson.client.protocol; import java.io.UnsupportedEncodingException; +import java.util.List; import io.netty.buffer.ByteBuf; import io.netty.util.CharsetUtil; @@ -21,4 +22,9 @@ public class StringCodec implements Codec { return buf.toString(CharsetUtil.UTF_8); } + @Override + public Object decode(List parts) { + throw new IllegalStateException(); + } + } diff --git a/src/main/java/org/redisson/client/protocol/StringReplayDecoder.java b/src/main/java/org/redisson/client/protocol/StringReplayDecoder.java index 05d841794..ba4ceae0f 100644 --- a/src/main/java/org/redisson/client/protocol/StringReplayDecoder.java +++ b/src/main/java/org/redisson/client/protocol/StringReplayDecoder.java @@ -15,6 +15,8 @@ */ package org.redisson.client.protocol; +import java.util.List; + import io.netty.buffer.ByteBuf; import io.netty.util.CharsetUtil; @@ -27,4 +29,9 @@ public class StringReplayDecoder implements Decoder { return status; } + @Override + public String decode(List parts) { + throw new IllegalStateException(); + } + } diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index bbf0cc87c..1c64a57b1 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -23,6 +23,7 @@ import io.netty.util.concurrent.Future; import org.redisson.async.AsyncOperation; import org.redisson.async.SyncInterruptedOperation; import org.redisson.async.SyncOperation; +import org.redisson.client.protocol.RedisCommand; import com.lambdaworks.redis.RedisClient; import com.lambdaworks.redis.RedisConnection; @@ -38,6 +39,12 @@ import java.util.concurrent.TimeUnit; //TODO ping support public interface ConnectionManager { + Future writeAsyncVoid(String key, RedisCommand command, Object ... params); + + Future writeAsync(String key, RedisCommand command, Object ... params); + + Future readAsync(String key, RedisCommand command, Object ... params); + RedisClient createClient(String host, int port, int timeout); RedisClient createClient(String host, int port); @@ -46,26 +53,16 @@ public interface ConnectionManager { R read(String key, SyncOperation operation); - R read(SyncOperation operation); - R write(String key, SyncInterruptedOperation operation) throws InterruptedException; - R write(SyncInterruptedOperation operation) throws InterruptedException; - R write(String key, SyncOperation operation); - R write(SyncOperation operation); - R write(String key, AsyncOperation asyncOperation); - R write(AsyncOperation asyncOperation); - Future writeAllAsync(AsyncOperation asyncOperation); T read(String key, AsyncOperation asyncOperation); - T read(AsyncOperation asyncOperation); - Future readAsync(String key, AsyncOperation asyncOperation); Future readAsync(AsyncOperation asyncOperation); diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 1e5123dd1..12420a0c8 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -15,19 +15,6 @@ */ package org.redisson.connection; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.channel.epoll.EpollEventLoopGroup; -import io.netty.channel.epoll.EpollSocketChannel; -import io.netty.util.HashedWheelTimer; -import io.netty.util.Timeout; -import io.netty.util.TimerTask; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; -import io.netty.util.concurrent.Promise; - import java.util.Collection; import java.util.HashSet; import java.util.Map.Entry; @@ -45,6 +32,10 @@ import org.redisson.MasterSlaveServersConfig; import org.redisson.async.AsyncOperation; import org.redisson.async.SyncInterruptedOperation; import org.redisson.async.SyncOperation; +import org.redisson.client.handler.RedisData; +import org.redisson.client.protocol.Codec; +import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.StringCodec; import org.redisson.codec.RedisCodecWrapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,6 +52,19 @@ import com.lambdaworks.redis.pubsub.RedisPubSubAdapter; import com.lambdaworks.redis.pubsub.RedisPubSubConnection; import com.lambdaworks.redis.pubsub.RedisPubSubListener; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollSocketChannel; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.Promise; + /** * * @author Nikita Koksharov @@ -144,6 +148,17 @@ public class MasterSlaveConnectionManager implements ConnectionManager { }; } + public FutureListener createReleaseReadListener(final int slot, final org.redisson.client.RedisConnection conn, + final Timeout timeout) { + return new FutureListener() { + @Override + public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { + timeout.cancel(); + releaseRead(slot, conn); + } + }; + } + public Future writeAllAsync(AsyncOperation asyncOperation) { Promise mainPromise = getGroup().next().newPromise(); AtomicInteger counter = new AtomicInteger(entries.keySet().size()); @@ -285,10 +300,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return write(slot, operation, 0); } - public R write(SyncInterruptedOperation operation) throws InterruptedException { - return write(-1, operation, 0); - } - private R write(int slot, SyncInterruptedOperation operation, int attempt) throws InterruptedException { try { RedisConnection connection = connectionWriteOp(slot); @@ -326,10 +337,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return write(slot, operation, 0); } - public R write(SyncOperation operation) { - return write(-1, operation, 0); - } - private R write(int slot, SyncOperation operation, int attempt) { try { RedisConnection connection = connectionWriteOp(slot); @@ -365,10 +372,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return read(slot, operation, 0); } - public R read(SyncOperation operation) { - return read(-1, operation, 0); - } - private R read(int slot, SyncOperation operation, int attempt) { try { RedisConnection connection = connectionReadOp(slot); @@ -422,10 +425,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return get(mainPromise); } - public R write(AsyncOperation asyncOperation) { - return get(writeAsync(asyncOperation)); - } - public V get(Future future) { future.awaitUninterruptibly(); if (future.isSuccess()) { @@ -443,10 +442,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return get(mainPromise); } - public T read(AsyncOperation asyncOperation) { - return get(readAsync(asyncOperation)); - } - public Future readAsync(String key, AsyncOperation asyncOperation) { Promise mainPromise = getGroup().next().newPromise(); int slot = calcSlot(key); @@ -517,6 +512,105 @@ public class MasterSlaveConnectionManager implements ConnectionManager { }); } + public Future readAsync(String key, RedisCommand command, Object ... params) { + Promise mainPromise = getGroup().next().newPromise(); + int slot = calcSlot(key); + async(true, slot, new StringCodec(), command, params, mainPromise, 0); + return mainPromise; + } + + public Future writeAsyncVoid(String key, RedisCommand command, Object ... params) { + final Promise voidPromise = getGroup().next().newPromise(); + Promise mainPromise = getGroup().next().newPromise(); + mainPromise.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (future.isCancelled()) { + voidPromise.cancel(true); + } else { + if (future.isSuccess()) { + voidPromise.setSuccess(null); + } else { + voidPromise.setFailure(future.cause()); + } + } + } + }); + int slot = calcSlot(key); + async(false, slot, new StringCodec(), command, params, mainPromise, 0); + return voidPromise; + } + + public Future writeAsync(String key, RedisCommand command, Object ... params) { + Promise mainPromise = getGroup().next().newPromise(); + int slot = calcSlot(key); + async(false, slot, new StringCodec(), command, params, mainPromise, 0); + return mainPromise; + } + + private void async(final boolean readOnlyMode, final int slot, final Codec codec, final RedisCommand command, + final Object[] params, final Promise mainPromise, final int attempt) { + final Promise attemptPromise = getGroup().next().newPromise(); + final AtomicReference ex = new AtomicReference(); + + TimerTask timerTask = new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + if (attemptPromise.isDone()) { + return; + } + if (attempt == config.getRetryAttempts()) { + attemptPromise.setFailure(ex.get()); + return; + } + attemptPromise.cancel(true); + + int count = attempt + 1; + async(readOnlyMode, slot, codec, command, params, mainPromise, count); + } + }; + + try { + org.redisson.client.RedisConnection connection; + if (readOnlyMode) { + connection = connectionReadOp2(slot); + } else { + connection = connectionReadOp2(slot); + } + log.debug("readAsync for slot {} using {}", slot, connection.getRedisClient().getAddr()); + connection.send(new RedisData(attemptPromise, codec, command, params)); + + ex.set(new RedisTimeoutException()); + Timeout timeout = timer.newTimeout(timerTask, config.getTimeout(), TimeUnit.MILLISECONDS); + attemptPromise.addListener(createReleaseReadListener(slot, connection, timeout)); + } catch (RedisConnectionException e) { + ex.set(e); + timer.newTimeout(timerTask, config.getRetryInterval(), TimeUnit.MILLISECONDS); + } + attemptPromise.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (future.isCancelled()) { + return; + } + // TODO cancel timeout + + if (future.cause() instanceof RedisMovedException) { + RedisMovedException ex = (RedisMovedException)future.cause(); + async(readOnlyMode, ex.getSlot(), codec, command, params, mainPromise, attempt); + return; + } + + if (future.isSuccess()) { + mainPromise.setSuccess(future.getNow()); + } else { + mainPromise.setFailure(future.cause()); + } + } + }); + } + + @Override public PubSubConnectionEntry getEntry(String channelName) { return name2PubSubConnection.get(channelName); @@ -775,6 +869,14 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return getEntry(slot).connectionReadOp(); } + public org.redisson.client.RedisConnection connectionReadOp2(int slot) { + return null; + } + + protected org.redisson.client.RedisConnection connectionWriteOp2(int slot) { + return null; + } + RedisPubSubConnection nextPubSubConnection(int slot) { return getEntry(slot).nextPubSubConnection(); } @@ -791,6 +893,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager { getEntry(slot).releaseRead(connection); } + public void releaseRead(int slot, org.redisson.client.RedisConnection connection) { +// getEntry(slot).releaseRead(connection); + } + + @Override public void shutdown() { for (MasterSlaveEntry entry : entries.values()) {