PubSub support

pull/243/head
Nikita 10 years ago
parent 6c182ed77e
commit e5da696339

@ -43,7 +43,7 @@ public class RedissonDeque<V> extends RedissonQueue<V> implements RDeque<V> {
@Override
public void addFirst(final V e) {
connectionManager.write(new VoidOperation<V, Long>() {
connectionManager.write(getName(), new VoidOperation<V, Long>() {
@Override
protected Future<Long> execute(RedisAsyncConnection<Object, V> async) {
return async.lpush(getName(), e);
@ -53,7 +53,7 @@ public class RedissonDeque<V> extends RedissonQueue<V> implements RDeque<V> {
@Override
public void addLast(final V e) {
connectionManager.write(new VoidOperation<V, Long>() {
connectionManager.write(getName(), new VoidOperation<V, Long>() {
@Override
protected Future<Long> execute(RedisAsyncConnection<Object, V> async) {
return async.rpush(getName(), e);
@ -99,7 +99,7 @@ public class RedissonDeque<V> extends RedissonQueue<V> implements RDeque<V> {
@Override
public V getLast() {
List<V> list = connectionManager.read(new ResultOperation<List<V>, V>() {
List<V> list = connectionManager.read(getName(), new ResultOperation<List<V>, V>() {
@Override
protected Future<List<V>> execute(RedisAsyncConnection<Object, V> async) {
return async.lrange(getName(), -1, -1);
@ -113,7 +113,7 @@ public class RedissonDeque<V> extends RedissonQueue<V> implements RDeque<V> {
@Override
public boolean offerFirst(final V e) {
connectionManager.write(new ResultOperation<Long, Object>() {
connectionManager.write(getName(), new ResultOperation<Long, Object>() {
@Override
protected Future<Long> execute(RedisAsyncConnection<Object, Object> async) {
return async.lpush(getName(), e);
@ -134,7 +134,7 @@ public class RedissonDeque<V> extends RedissonQueue<V> implements RDeque<V> {
@Override
public V peekLast() {
List<V> list = connectionManager.read(new ResultOperation<List<V>, V>() {
List<V> list = connectionManager.read(getName(), new ResultOperation<List<V>, V>() {
@Override
protected Future<List<V>> execute(RedisAsyncConnection<Object, V> async) {
return async.lrange(getName(), -1, -1);
@ -153,7 +153,7 @@ public class RedissonDeque<V> extends RedissonQueue<V> implements RDeque<V> {
@Override
public V pollLast() {
return connectionManager.write(new ResultOperation<V, V>() {
return connectionManager.write(getName(), new ResultOperation<V, V>() {
@Override
protected Future<V> execute(RedisAsyncConnection<Object, V> async) {
return async.rpop(getName());
@ -178,7 +178,7 @@ public class RedissonDeque<V> extends RedissonQueue<V> implements RDeque<V> {
@Override
public V removeLast() {
V value = connectionManager.write(new ResultOperation<V, V>() {
V value = connectionManager.write(getName(), new ResultOperation<V, V>() {
@Override
protected Future<V> execute(RedisAsyncConnection<Object, V> async) {
return async.rpop(getName());

@ -74,7 +74,7 @@ abstract class RedissonExpirable extends RedissonObject implements RExpirable {
@Override
public long remainTimeToLive() {
return connectionManager.write(new ResultOperation<Long, Object>() {
return connectionManager.write(getName(), new ResultOperation<Long, Object>() {
@Override
protected Future<Long> execute(RedisAsyncConnection<Object, Object> async) {
return async.ttl(getName());

@ -393,7 +393,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
@Override
public boolean isLocked() {
return connectionManager.read(new SyncOperation<Boolean, Boolean>() {
return connectionManager.read(getName(), new SyncOperation<Boolean, Boolean>() {
@Override
public Boolean execute(RedisConnection<Object, Boolean> conn) {
return conn.exists(getName());

@ -94,7 +94,7 @@ public class RedissonQueue<V> extends RedissonList<V> implements RQueue<V> {
@Override
public V pollLastAndOfferFirstTo(final String queueName) {
return connectionManager.write(new ResultOperation<V, V>() {
return connectionManager.write(getName(), new ResultOperation<V, V>() {
@Override
protected Future<V> execute(RedisAsyncConnection<Object, V> async) {
return async.rpoplpush(getName(), queueName);

@ -16,12 +16,12 @@
package org.redisson.client;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutionException;
import org.redisson.client.handler.RedisCommandsQueue;
import org.redisson.client.handler.RedisDecoder;
import org.redisson.client.handler.RedisEncoder;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.StringCodec;
import org.redisson.client.protocol.PubSubMessage;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
@ -35,6 +35,7 @@ import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;
public class RedisClient {
@ -56,8 +57,7 @@ public class RedisClient {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addFirst(
new RedisEncoder(),
ch.pipeline().addFirst(new RedisEncoder(),
new RedisCommandsQueue(),
new RedisDecoder());
}
@ -87,20 +87,34 @@ public class RedisClient {
return new RedisConnection(this, future.channel());
}
public RedisPubSubConnection connectPubSub() {
ChannelFuture future = bootstrap.connect();
future.syncUninterruptibly();
channels.add(future.channel());
return new RedisPubSubConnection(this, future.channel());
}
public ChannelGroupFuture shutdownAsync() {
return channels.close();
}
public static void main(String[] args) throws InterruptedException {
public static void main(String[] args) throws InterruptedException, ExecutionException {
final RedisClient c = new RedisClient("127.0.0.1", 6379);
RedisConnection rc = c.connect();
// for (int i = 0; i < 10000; i++) {
String res1 = rc.sync(RedisCommands.CLIENT_SETNAME, "12333");
System.out.println("res 12: " + res1);
String res2 = rc.sync(RedisCommands.CLIENT_GETNAME);
System.out.println("res name: " + res2);
Boolean res3 = rc.sync(new StringCodec(), RedisCommands.EXISTS, "33");
System.out.println("res name 2: " + res3);
RedisPubSubConnection rpsc = c.connectPubSub();
// String res1 = rc.sync(RedisCommands.CLIENT_SETNAME, "12333");
// System.out.println("res 12: " + res1);
// String res2 = rc.sync(RedisCommands.CLIENT_GETNAME);
// System.out.println("res name: " + res2);
// Boolean res3 = rc.sync(new StringCodec(), RedisCommands.EXISTS, "33");
// System.out.println("res name 2: " + res3);
Future<Long> m = rpsc.publish("sss", "123");
System.out.println("out: " + m.get());
Future<PubSubMessage> m1 = rpsc.subscribe("sss");
System.out.println("out: " + m1.get());
/* Future<String> res = rc.execute(new StringCodec(), RedisCommands.SET, "test", "" + Math.random());

@ -0,0 +1,44 @@
package org.redisson.client;
import org.redisson.client.handler.RedisData;
import org.redisson.client.protocol.Codec;
import org.redisson.client.protocol.PubSubMessage;
import org.redisson.client.protocol.PubSubMessageDecoder;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.StringCodec;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
public class RedisPubSubConnection {
final Channel channel;
final RedisClient redisClient;
public RedisPubSubConnection(RedisClient redisClient, Channel channel) {
this.redisClient = redisClient;
this.channel = channel;
}
public Future<PubSubMessage> subscribe(String ... channel) {
return async(new PubSubMessageDecoder(), RedisCommands.SUBSCRIBE, channel);
}
public Future<Long> publish(String channel, String msg) {
return async(new StringCodec(), RedisCommands.PUBLISH, channel, msg);
}
public <T, R> Future<R> async(Codec encoder, RedisCommand<T> command, Object ... params) {
Promise<R> promise = redisClient.getBootstrap().group().next().<R>newPromise();
channel.writeAndFlush(new RedisData<T, R>(promise, encoder, command, params));
return promise;
}
public ChannelFuture closeAsync() {
return channel.close();
}
}

@ -16,6 +16,7 @@
package org.redisson.client.handler;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.redisson.client.RedisException;
@ -25,39 +26,71 @@ import org.redisson.client.protocol.Decoder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import io.netty.util.CharsetUtil;
public class RedisDecoder extends ReplayingDecoder<Void> {
private static final char CR = '\r';
private static final char LF = '\n';
public static final char CR = '\r';
public static final char LF = '\n';
private static final char ZERO = '0';
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
RedisData<Object, Object> data = ctx.channel().attr(RedisCommandsQueue.REPLAY_PROMISE).getAndRemove();
decode(in, data, null);
ctx.pipeline().fireUserEventTriggered(QueueCommands.NEXT_COMMAND);
}
private void decode(ByteBuf in, RedisData<Object, Object> data, List<Object> parts) throws IOException {
int code = in.readByte();
if (code == '+') {
Object result = data.getCommand().getReponseDecoder().decode(in);
if (parts != null) {
parts.add(result);
} else {
data.getPromise().setSuccess(result);
}
} else if (code == '-') {
Object result = data.getCommand().getReponseDecoder().decode(in);
data.getPromise().setFailure(new RedisException(result.toString()));
} else if (code == ':') {
Object result = data.getCommand().getReponseDecoder().decode(in);
String status = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8);
in.skipBytes(2);
Long result = Long.valueOf(status);
if (parts != null) {
parts.add(result);
} else {
data.getPromise().setSuccess(result);
}
} else if (code == '$') {
Decoder<Object> decoder = data.getCommand().getReponseDecoder();
if (decoder == null) {
decoder = data.getCodec();
}
Object result = decoder.decode(readBytes(in));
if (parts != null) {
parts.add(result);
} else {
data.getPromise().setSuccess(result);
}
} else if (code == '*') {
long size = readLong(in);
List<Object> respParts = new ArrayList<Object>();
for (int i = 0; i < size; i++) {
decode(in, data, respParts);
}
Decoder<Object> decoder = data.getCommand().getReponseDecoder();
if (decoder == null) {
decoder = data.getCodec();
}
Object result = decoder.decode(respParts);
data.getPromise().setSuccess(result);
} else {
throw new IllegalStateException("Can't decode replay " + (char)code);
}
ctx.pipeline().fireUserEventTriggered(QueueCommands.NEXT_COMMAND);
}
public ByteBuf readBytes(ByteBuf is) throws IOException {

@ -15,6 +15,8 @@
*/
package org.redisson.client.protocol;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
@ -27,4 +29,9 @@ public class BooleanReplayDecoder implements Decoder<Boolean> {
return Boolean.valueOf(status);
}
@Override
public Boolean decode(List<Object> parts) {
throw new IllegalStateException();
}
}

@ -15,10 +15,14 @@
*/
package org.redisson.client.protocol;
import java.util.List;
import io.netty.buffer.ByteBuf;
public interface Decoder<R> {
R decode(ByteBuf buf);
R decode(List<Object> parts);
}

@ -0,0 +1,29 @@
package org.redisson.client.protocol;
public class PubSubMessage {
public enum Type {SUBSCRIBE, MESSAGE}
private Type type;
private String channel;
public PubSubMessage(Type type, String channel) {
super();
this.type = type;
this.channel = channel;
}
public String getChannel() {
return channel;
}
public Type getType() {
return type;
}
@Override
public String toString() {
return "PubSubReplay [type=" + type + ", channel=" + channel + "]";
}
}

@ -0,0 +1,47 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol;
import java.io.UnsupportedEncodingException;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class PubSubMessageDecoder implements Codec {
@Override
public String decode(ByteBuf buf) {
String status = buf.toString(CharsetUtil.UTF_8);
buf.skipBytes(2);
return status;
}
@Override
public PubSubMessage decode(List<Object> parts) {
return new PubSubMessage(PubSubMessage.Type.valueOf(parts.get(0).toString().toUpperCase()), parts.get(1).toString());
}
@Override
public byte[] encode(int paramIndex, Object in) {
try {
return in.toString().getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new IllegalStateException(e);
}
}
}

@ -15,10 +15,6 @@
*/
package org.redisson.client.protocol;
import org.redisson.client.handler.RedisData;
import io.netty.util.concurrent.Future;
public interface RedisCommands {
RedisStringCommand AUTH = new RedisStringCommand("AUTH", new StringReplayDecoder());
@ -31,16 +27,7 @@ public interface RedisCommands {
RedisCommand<String> SETEX = new RedisCommand<String>("SETEX", new StringReplayDecoder(), 2);
RedisCommand<Boolean> EXISTS = new RedisCommand<Boolean>("EXISTS", new BooleanReplayDecoder(), 1);
String sync(RedisStringCommand command, Object ... params);
Future<String> async(RedisStringCommand command, Object ... params);
<T, R> R sync(Codec encoder, RedisCommand<T> command, Object ... params);
<T, R> Future<R> async(Codec encoder, RedisCommand<T> command, Object ... params);
<T, R> void send(RedisData<T, R> data);
RedisCommand<Long> PUBLISH = new RedisCommand<Long>("PUBLISH", 1);
RedisCommand<PubSubMessageDecoder> SUBSCRIBE = new RedisCommand<PubSubMessageDecoder>("SUBSCRIBE", 1);
}

@ -1,6 +1,7 @@
package org.redisson.client.protocol;
import java.io.UnsupportedEncodingException;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
@ -21,4 +22,9 @@ public class StringCodec implements Codec {
return buf.toString(CharsetUtil.UTF_8);
}
@Override
public Object decode(List<Object> parts) {
throw new IllegalStateException();
}
}

@ -15,6 +15,8 @@
*/
package org.redisson.client.protocol;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
@ -27,4 +29,9 @@ public class StringReplayDecoder implements Decoder<String> {
return status;
}
@Override
public String decode(List<Object> parts) {
throw new IllegalStateException();
}
}

@ -23,6 +23,7 @@ import io.netty.util.concurrent.Future;
import org.redisson.async.AsyncOperation;
import org.redisson.async.SyncInterruptedOperation;
import org.redisson.async.SyncOperation;
import org.redisson.client.protocol.RedisCommand;
import com.lambdaworks.redis.RedisClient;
import com.lambdaworks.redis.RedisConnection;
@ -38,6 +39,12 @@ import java.util.concurrent.TimeUnit;
//TODO ping support
public interface ConnectionManager {
Future<Void> writeAsyncVoid(String key, RedisCommand<String> command, Object ... params);
<T, R> Future<R> writeAsync(String key, RedisCommand<T> command, Object ... params);
<T, R> Future<R> readAsync(String key, RedisCommand<T> command, Object ... params);
RedisClient createClient(String host, int port, int timeout);
RedisClient createClient(String host, int port);
@ -46,26 +53,16 @@ public interface ConnectionManager {
<V, R> R read(String key, SyncOperation<V, R> operation);
<V, R> R read(SyncOperation<V, R> operation);
<V, R> R write(String key, SyncInterruptedOperation<V, R> operation) throws InterruptedException;
<V, R> R write(SyncInterruptedOperation<V, R> operation) throws InterruptedException;
<V, R> R write(String key, SyncOperation<V, R> operation);
<V, R> R write(SyncOperation<V, R> operation);
<V, R> R write(String key, AsyncOperation<V, R> asyncOperation);
<V, R> R write(AsyncOperation<V, R> asyncOperation);
<V, T> Future<T> writeAllAsync(AsyncOperation<V, T> asyncOperation);
<V, T> T read(String key, AsyncOperation<V, T> asyncOperation);
<V, T> T read(AsyncOperation<V, T> asyncOperation);
<V, T> Future<T> readAsync(String key, AsyncOperation<V, T> asyncOperation);
<V, T> Future<T> readAsync(AsyncOperation<V, T> asyncOperation);

@ -15,19 +15,6 @@
*/
package org.redisson.connection;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map.Entry;
@ -45,6 +32,10 @@ import org.redisson.MasterSlaveServersConfig;
import org.redisson.async.AsyncOperation;
import org.redisson.async.SyncInterruptedOperation;
import org.redisson.async.SyncOperation;
import org.redisson.client.handler.RedisData;
import org.redisson.client.protocol.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.StringCodec;
import org.redisson.codec.RedisCodecWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -61,6 +52,19 @@ import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
import com.lambdaworks.redis.pubsub.RedisPubSubListener;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
/**
*
* @author Nikita Koksharov
@ -144,6 +148,17 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
};
}
public <T> FutureListener<T> createReleaseReadListener(final int slot, final org.redisson.client.RedisConnection conn,
final Timeout timeout) {
return new FutureListener<T>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<T> future) throws Exception {
timeout.cancel();
releaseRead(slot, conn);
}
};
}
public <V, T> Future<T> writeAllAsync(AsyncOperation<V, T> asyncOperation) {
Promise<T> mainPromise = getGroup().next().newPromise();
AtomicInteger counter = new AtomicInteger(entries.keySet().size());
@ -285,10 +300,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return write(slot, operation, 0);
}
public <V, R> R write(SyncInterruptedOperation<V, R> operation) throws InterruptedException {
return write(-1, operation, 0);
}
private <V, R> R write(int slot, SyncInterruptedOperation<V, R> operation, int attempt) throws InterruptedException {
try {
RedisConnection<Object, V> connection = connectionWriteOp(slot);
@ -326,10 +337,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return write(slot, operation, 0);
}
public <V, R> R write(SyncOperation<V, R> operation) {
return write(-1, operation, 0);
}
private <V, R> R write(int slot, SyncOperation<V, R> operation, int attempt) {
try {
RedisConnection<Object, V> connection = connectionWriteOp(slot);
@ -365,10 +372,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return read(slot, operation, 0);
}
public <V, R> R read(SyncOperation<V, R> operation) {
return read(-1, operation, 0);
}
private <V, R> R read(int slot, SyncOperation<V, R> operation, int attempt) {
try {
RedisConnection<Object, V> connection = connectionReadOp(slot);
@ -422,10 +425,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return get(mainPromise);
}
public <V, R> R write(AsyncOperation<V, R> asyncOperation) {
return get(writeAsync(asyncOperation));
}
public <V> V get(Future<V> future) {
future.awaitUninterruptibly();
if (future.isSuccess()) {
@ -443,10 +442,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return get(mainPromise);
}
public <V, T> T read(AsyncOperation<V, T> asyncOperation) {
return get(readAsync(asyncOperation));
}
public <V, T> Future<T> readAsync(String key, AsyncOperation<V, T> asyncOperation) {
Promise<T> mainPromise = getGroup().next().newPromise();
int slot = calcSlot(key);
@ -517,6 +512,105 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
});
}
public <T, R> Future<R> readAsync(String key, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = getGroup().next().newPromise();
int slot = calcSlot(key);
async(true, slot, new StringCodec(), command, params, mainPromise, 0);
return mainPromise;
}
public Future<Void> writeAsyncVoid(String key, RedisCommand<String> command, Object ... params) {
final Promise<Void> voidPromise = getGroup().next().newPromise();
Promise<String> mainPromise = getGroup().next().newPromise();
mainPromise.addListener(new FutureListener<String>() {
@Override
public void operationComplete(Future<String> future) throws Exception {
if (future.isCancelled()) {
voidPromise.cancel(true);
} else {
if (future.isSuccess()) {
voidPromise.setSuccess(null);
} else {
voidPromise.setFailure(future.cause());
}
}
}
});
int slot = calcSlot(key);
async(false, slot, new StringCodec(), command, params, mainPromise, 0);
return voidPromise;
}
public <T, R> Future<R> writeAsync(String key, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = getGroup().next().newPromise();
int slot = calcSlot(key);
async(false, slot, new StringCodec(), command, params, mainPromise, 0);
return mainPromise;
}
private <V, R> void async(final boolean readOnlyMode, final int slot, final Codec codec, final RedisCommand<V> command,
final Object[] params, final Promise<R> mainPromise, final int attempt) {
final Promise<R> attemptPromise = getGroup().next().newPromise();
final AtomicReference<RedisException> ex = new AtomicReference<RedisException>();
TimerTask timerTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (attemptPromise.isDone()) {
return;
}
if (attempt == config.getRetryAttempts()) {
attemptPromise.setFailure(ex.get());
return;
}
attemptPromise.cancel(true);
int count = attempt + 1;
async(readOnlyMode, slot, codec, command, params, mainPromise, count);
}
};
try {
org.redisson.client.RedisConnection connection;
if (readOnlyMode) {
connection = connectionReadOp2(slot);
} else {
connection = connectionReadOp2(slot);
}
log.debug("readAsync for slot {} using {}", slot, connection.getRedisClient().getAddr());
connection.send(new RedisData<V, R>(attemptPromise, codec, command, params));
ex.set(new RedisTimeoutException());
Timeout timeout = timer.newTimeout(timerTask, config.getTimeout(), TimeUnit.MILLISECONDS);
attemptPromise.addListener(createReleaseReadListener(slot, connection, timeout));
} catch (RedisConnectionException e) {
ex.set(e);
timer.newTimeout(timerTask, config.getRetryInterval(), TimeUnit.MILLISECONDS);
}
attemptPromise.addListener(new FutureListener<R>() {
@Override
public void operationComplete(Future<R> future) throws Exception {
if (future.isCancelled()) {
return;
}
// TODO cancel timeout
if (future.cause() instanceof RedisMovedException) {
RedisMovedException ex = (RedisMovedException)future.cause();
async(readOnlyMode, ex.getSlot(), codec, command, params, mainPromise, attempt);
return;
}
if (future.isSuccess()) {
mainPromise.setSuccess(future.getNow());
} else {
mainPromise.setFailure(future.cause());
}
}
});
}
@Override
public PubSubConnectionEntry getEntry(String channelName) {
return name2PubSubConnection.get(channelName);
@ -775,6 +869,14 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return getEntry(slot).connectionReadOp();
}
public org.redisson.client.RedisConnection connectionReadOp2(int slot) {
return null;
}
protected org.redisson.client.RedisConnection connectionWriteOp2(int slot) {
return null;
}
RedisPubSubConnection nextPubSubConnection(int slot) {
return getEntry(slot).nextPubSubConnection();
}
@ -791,6 +893,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
getEntry(slot).releaseRead(connection);
}
public void releaseRead(int slot, org.redisson.client.RedisConnection connection) {
// getEntry(slot).releaseRead(connection);
}
@Override
public void shutdown() {
for (MasterSlaveEntry entry : entries.values()) {

Loading…
Cancel
Save