|
|
|
@ -15,12 +15,17 @@
|
|
|
|
|
*/
|
|
|
|
|
package org.redisson.connection;
|
|
|
|
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
|
|
import java.util.Arrays;
|
|
|
|
|
import java.util.Collection;
|
|
|
|
|
import java.util.HashSet;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
import java.util.Map.Entry;
|
|
|
|
|
import java.util.NavigableMap;
|
|
|
|
|
import java.util.Queue;
|
|
|
|
|
import java.util.Set;
|
|
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
|
|
|
import java.util.concurrent.ConcurrentMap;
|
|
|
|
|
import java.util.concurrent.ConcurrentSkipListMap;
|
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
@ -29,29 +34,25 @@ import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
|
|
|
|
|
|
import org.redisson.Config;
|
|
|
|
|
import org.redisson.MasterSlaveServersConfig;
|
|
|
|
|
import org.redisson.async.AsyncOperation;
|
|
|
|
|
import org.redisson.async.SyncInterruptedOperation;
|
|
|
|
|
import org.redisson.async.SyncOperation;
|
|
|
|
|
import org.redisson.SyncOperation;
|
|
|
|
|
import org.redisson.client.RedisClient;
|
|
|
|
|
import org.redisson.client.RedisConnection;
|
|
|
|
|
import org.redisson.client.RedisConnectionException;
|
|
|
|
|
import org.redisson.client.RedisException;
|
|
|
|
|
import org.redisson.client.RedisMovedException;
|
|
|
|
|
import org.redisson.client.RedisPubSubConnection;
|
|
|
|
|
import org.redisson.client.RedisPubSubListener;
|
|
|
|
|
import org.redisson.client.RedisTimeoutException;
|
|
|
|
|
import org.redisson.client.handler.RedisData;
|
|
|
|
|
import org.redisson.client.protocol.Codec;
|
|
|
|
|
import org.redisson.client.protocol.RedisCommand;
|
|
|
|
|
import org.redisson.client.protocol.StringCodec;
|
|
|
|
|
import org.redisson.codec.RedisCodecWrapper;
|
|
|
|
|
import org.redisson.client.protocol.decoder.MultiDecoder;
|
|
|
|
|
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
|
|
|
|
|
import org.redisson.client.protocol.pubsub.PubSubStatusMessage.Type;
|
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
|
|
|
|
import com.lambdaworks.redis.RedisAsyncConnection;
|
|
|
|
|
import com.lambdaworks.redis.RedisClient;
|
|
|
|
|
import com.lambdaworks.redis.RedisConnection;
|
|
|
|
|
import com.lambdaworks.redis.RedisConnectionException;
|
|
|
|
|
import com.lambdaworks.redis.RedisException;
|
|
|
|
|
import com.lambdaworks.redis.RedisMovedException;
|
|
|
|
|
import com.lambdaworks.redis.RedisTimeoutException;
|
|
|
|
|
import com.lambdaworks.redis.codec.RedisCodec;
|
|
|
|
|
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
|
|
|
|
|
import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
|
|
|
|
|
import com.lambdaworks.redis.pubsub.RedisPubSubListener;
|
|
|
|
|
|
|
|
|
|
import io.netty.channel.EventLoopGroup;
|
|
|
|
|
import io.netty.channel.epoll.EpollEventLoopGroup;
|
|
|
|
|
import io.netty.channel.epoll.EpollSocketChannel;
|
|
|
|
@ -61,6 +62,7 @@ import io.netty.channel.socket.nio.NioSocketChannel;
|
|
|
|
|
import io.netty.util.HashedWheelTimer;
|
|
|
|
|
import io.netty.util.Timeout;
|
|
|
|
|
import io.netty.util.TimerTask;
|
|
|
|
|
import io.netty.util.concurrent.DefaultPromise;
|
|
|
|
|
import io.netty.util.concurrent.Future;
|
|
|
|
|
import io.netty.util.concurrent.FutureListener;
|
|
|
|
|
import io.netty.util.concurrent.Promise;
|
|
|
|
@ -76,7 +78,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
|
|
|
|
|
|
|
|
|
|
private final HashedWheelTimer timer = new HashedWheelTimer();
|
|
|
|
|
|
|
|
|
|
protected RedisCodec codec;
|
|
|
|
|
protected Codec codec;
|
|
|
|
|
|
|
|
|
|
protected EventLoopGroup group;
|
|
|
|
|
|
|
|
|
@ -115,7 +117,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
|
|
|
|
|
this.group = new NioEventLoopGroup(cfg.getThreads());
|
|
|
|
|
this.socketChannelClass = NioSocketChannel.class;
|
|
|
|
|
}
|
|
|
|
|
this.codec = new RedisCodecWrapper(cfg.getCodec());
|
|
|
|
|
this.codec = cfg.getCodec();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public RedisClient createClient(String host, int port) {
|
|
|
|
@ -148,210 +150,158 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public <T> FutureListener<T> createReleaseReadListener(final int slot, final org.redisson.client.RedisConnection conn,
|
|
|
|
|
final Timeout timeout) {
|
|
|
|
|
return new FutureListener<T>() {
|
|
|
|
|
public <T> Future<Queue<Object>> readAllAsync(RedisCommand<T> command, Object ... params) {
|
|
|
|
|
final Promise<Queue<Object>> mainPromise = getGroup().next().newPromise();
|
|
|
|
|
Promise<Object> promise = new DefaultPromise<Object>() {
|
|
|
|
|
Queue<Object> results = new ConcurrentLinkedQueue<Object>();
|
|
|
|
|
AtomicInteger counter = new AtomicInteger(entries.keySet().size());
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(io.netty.util.concurrent.Future<T> future) throws Exception {
|
|
|
|
|
timeout.cancel();
|
|
|
|
|
releaseRead(slot, conn);
|
|
|
|
|
public Promise<Object> setSuccess(Object result) {
|
|
|
|
|
if (result instanceof Collection) {
|
|
|
|
|
results.addAll((Collection)result);
|
|
|
|
|
} else {
|
|
|
|
|
results.add(result);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (counter.decrementAndGet() == 0
|
|
|
|
|
&& !mainPromise.isDone()) {
|
|
|
|
|
mainPromise.setSuccess(results);
|
|
|
|
|
}
|
|
|
|
|
return this;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public <V, T> Future<T> writeAllAsync(AsyncOperation<V, T> asyncOperation) {
|
|
|
|
|
Promise<T> mainPromise = getGroup().next().newPromise();
|
|
|
|
|
AtomicInteger counter = new AtomicInteger(entries.keySet().size());
|
|
|
|
|
for (Integer slot : entries.keySet()) {
|
|
|
|
|
writeAllAsync(slot, asyncOperation, counter, mainPromise, 0);
|
|
|
|
|
async(true, slot, null, new StringCodec(), command, params, promise, 0);
|
|
|
|
|
}
|
|
|
|
|
return mainPromise;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private <V, T> void writeAllAsync(final int slot, final AsyncOperation<V, T> asyncOperation, final AtomicInteger counter, final Promise<T> mainPromise, final int attempt) {
|
|
|
|
|
final Promise<T> promise = getGroup().next().newPromise();
|
|
|
|
|
final AtomicReference<RedisException> ex = new AtomicReference<RedisException>();
|
|
|
|
|
public <T> Future<Boolean> writeAllAsync(RedisCommand<T> command, Object ... params) {
|
|
|
|
|
return allAsync(false, command, params);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
TimerTask timerTask = new TimerTask() {
|
|
|
|
|
public <T> Future<Boolean> allAsync(boolean readOnlyMode, RedisCommand<T> command, Object ... params) {
|
|
|
|
|
final Promise<Boolean> mainPromise = getGroup().next().newPromise();
|
|
|
|
|
Promise<Object> promise = new DefaultPromise<Object>() {
|
|
|
|
|
AtomicInteger counter = new AtomicInteger(entries.keySet().size());
|
|
|
|
|
@Override
|
|
|
|
|
public void run(Timeout timeout) throws Exception {
|
|
|
|
|
if (promise.isDone()) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
if (attempt == config.getRetryAttempts()) {
|
|
|
|
|
promise.setFailure(ex.get());
|
|
|
|
|
return;
|
|
|
|
|
public Promise<Object> setSuccess(Object result) {
|
|
|
|
|
if (counter.decrementAndGet() == 0
|
|
|
|
|
&& !mainPromise.isDone()) {
|
|
|
|
|
mainPromise.setSuccess(true);
|
|
|
|
|
}
|
|
|
|
|
promise.cancel(true);
|
|
|
|
|
|
|
|
|
|
int count = attempt + 1;
|
|
|
|
|
writeAllAsync(slot, asyncOperation, counter, mainPromise, count);
|
|
|
|
|
return this;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
for (Integer slot : entries.keySet()) {
|
|
|
|
|
async(readOnlyMode, slot, null, new StringCodec(), command, params, promise, 0);
|
|
|
|
|
}
|
|
|
|
|
return mainPromise;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
RedisConnection<Object, V> connection = connectionWriteOp(slot);
|
|
|
|
|
RedisAsyncConnection<Object, V> async = connection.getAsync();
|
|
|
|
|
asyncOperation.execute(promise, async);
|
|
|
|
|
|
|
|
|
|
ex.set(new RedisTimeoutException());
|
|
|
|
|
Timeout timeout = timer.newTimeout(timerTask, config.getTimeout(), TimeUnit.MILLISECONDS);
|
|
|
|
|
promise.addListener(createReleaseWriteListener(slot, connection, timeout));
|
|
|
|
|
} catch (RedisConnectionException e) {
|
|
|
|
|
ex.set(e);
|
|
|
|
|
timer.newTimeout(timerTask, config.getRetryInterval(), TimeUnit.MILLISECONDS);
|
|
|
|
|
private int calcSlot(String key) {
|
|
|
|
|
if (entries.size() == 1) {
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
promise.addListener(new FutureListener<T>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<T> future) throws Exception {
|
|
|
|
|
if (future.isCancelled()) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (future.cause() instanceof RedisMovedException) {
|
|
|
|
|
RedisMovedException ex = (RedisMovedException)future.cause();
|
|
|
|
|
writeAllAsync(ex.getSlot(), asyncOperation, counter, mainPromise, attempt);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
int start = key.indexOf('{');
|
|
|
|
|
if (start != -1) {
|
|
|
|
|
int end = key.indexOf('}');
|
|
|
|
|
key = key.substring(start+1, end);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (future.isSuccess()) {
|
|
|
|
|
if (counter.decrementAndGet() == 0
|
|
|
|
|
&& !mainPromise.isDone()) {
|
|
|
|
|
mainPromise.setSuccess(future.getNow());
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
mainPromise.setFailure(future.cause());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
int result = CRC16.crc16(key.getBytes()) % 16384;
|
|
|
|
|
log.debug("slot {} for {}", result, key);
|
|
|
|
|
return result;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public <V, T> Future<T> writeAsync(String key, AsyncOperation<V, T> asyncOperation) {
|
|
|
|
|
Promise<T> mainPromise = getGroup().next().newPromise();
|
|
|
|
|
int slot = calcSlot(key);
|
|
|
|
|
writeAsync(slot, asyncOperation, mainPromise, 0);
|
|
|
|
|
return mainPromise;
|
|
|
|
|
public <V> V get(Future<V> future) {
|
|
|
|
|
future.awaitUninterruptibly();
|
|
|
|
|
if (future.isSuccess()) {
|
|
|
|
|
return future.getNow();
|
|
|
|
|
}
|
|
|
|
|
throw future.cause() instanceof RedisException ?
|
|
|
|
|
(RedisException) future.cause() :
|
|
|
|
|
new RedisException("Unexpected exception while processing command", future.cause());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public <V, T> Future<T> writeAsync(AsyncOperation<V, T> asyncOperation) {
|
|
|
|
|
Promise<T> mainPromise = getGroup().next().newPromise();
|
|
|
|
|
writeAsync(-1, asyncOperation, mainPromise, 0);
|
|
|
|
|
return mainPromise;
|
|
|
|
|
public <T, R> R read(String key, RedisCommand<T> command, Object ... params) {
|
|
|
|
|
return read(key, codec, command, params);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private <V, T> void writeAsync(final int slot, final AsyncOperation<V, T> asyncOperation, final Promise<T> mainPromise, final int attempt) {
|
|
|
|
|
final Promise<T> promise = getGroup().next().newPromise();
|
|
|
|
|
final AtomicReference<RedisException> ex = new AtomicReference<RedisException>();
|
|
|
|
|
|
|
|
|
|
TimerTask timerTask = new TimerTask() {
|
|
|
|
|
@Override
|
|
|
|
|
public void run(Timeout timeout) throws Exception {
|
|
|
|
|
if (promise.isDone()) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
if (attempt == config.getRetryAttempts()) {
|
|
|
|
|
promise.setFailure(ex.get());
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
promise.cancel(true);
|
|
|
|
|
public <T, R> R read(String key, Codec codec, RedisCommand<T> command, Object ... params) {
|
|
|
|
|
Future<R> res = readAsync(key, codec, command, params);
|
|
|
|
|
return get(res);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int count = attempt + 1;
|
|
|
|
|
writeAsync(slot, asyncOperation, mainPromise, count);
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
public <T, R> Future<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) {
|
|
|
|
|
Promise<R> mainPromise = getGroup().next().newPromise();
|
|
|
|
|
int slot = calcSlot(key);
|
|
|
|
|
async(true, slot, null, codec, command, params, mainPromise, 0);
|
|
|
|
|
return mainPromise;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
RedisConnection<Object, V> connection = connectionWriteOp(slot);
|
|
|
|
|
RedisAsyncConnection<Object, V> async = connection.getAsync();
|
|
|
|
|
log.debug("writeAsync for slot {} using {}", slot, connection.getRedisClient().getAddr());
|
|
|
|
|
asyncOperation.execute(promise, async);
|
|
|
|
|
public <T, R> Future<R> readAsync(String key, RedisCommand<T> command, Object ... params) {
|
|
|
|
|
return readAsync(key, codec, command, params);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ex.set(new RedisTimeoutException());
|
|
|
|
|
Timeout timeout = timer.newTimeout(timerTask, config.getTimeout(), TimeUnit.MILLISECONDS);
|
|
|
|
|
promise.addListener(createReleaseWriteListener(slot, connection, timeout));
|
|
|
|
|
} catch (RedisConnectionException e) {
|
|
|
|
|
ex.set(e);
|
|
|
|
|
timer.newTimeout(timerTask, config.getRetryInterval(), TimeUnit.MILLISECONDS);
|
|
|
|
|
}
|
|
|
|
|
promise.addListener(new FutureListener<T>() {
|
|
|
|
|
public <T> Future<Void> writeAsyncVoid(String key, RedisCommand<T> command, Object ... params) {
|
|
|
|
|
final Promise<Void> voidPromise = getGroup().next().newPromise();
|
|
|
|
|
Promise<String> mainPromise = getGroup().next().newPromise();
|
|
|
|
|
mainPromise.addListener(new FutureListener<String>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<T> future) throws Exception {
|
|
|
|
|
public void operationComplete(Future<String> future) throws Exception {
|
|
|
|
|
if (future.isCancelled()) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (future.cause() instanceof RedisMovedException) {
|
|
|
|
|
RedisMovedException ex = (RedisMovedException)future.cause();
|
|
|
|
|
writeAsync(ex.getSlot(), asyncOperation, mainPromise, attempt);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (future.isSuccess()) {
|
|
|
|
|
mainPromise.setSuccess(future.getNow());
|
|
|
|
|
voidPromise.cancel(true);
|
|
|
|
|
} else {
|
|
|
|
|
mainPromise.setFailure(future.cause());
|
|
|
|
|
if (future.isSuccess()) {
|
|
|
|
|
voidPromise.setSuccess(null);
|
|
|
|
|
} else {
|
|
|
|
|
voidPromise.setFailure(future.cause());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public <V, R> R write(String key, SyncInterruptedOperation<V, R> operation) throws InterruptedException {
|
|
|
|
|
int slot = calcSlot(key);
|
|
|
|
|
return write(slot, operation, 0);
|
|
|
|
|
async(false, slot, null, codec, command, params, mainPromise, 0);
|
|
|
|
|
return voidPromise;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private <V, R> R write(int slot, SyncInterruptedOperation<V, R> operation, int attempt) throws InterruptedException {
|
|
|
|
|
try {
|
|
|
|
|
RedisConnection<Object, V> connection = connectionWriteOp(slot);
|
|
|
|
|
try {
|
|
|
|
|
return operation.execute(connection);
|
|
|
|
|
} catch (RedisMovedException e) {
|
|
|
|
|
return write(e.getSlot(), operation, attempt);
|
|
|
|
|
} catch (RedisTimeoutException e) {
|
|
|
|
|
if (attempt == config.getRetryAttempts()) {
|
|
|
|
|
throw e;
|
|
|
|
|
}
|
|
|
|
|
attempt++;
|
|
|
|
|
return write(slot, operation, attempt);
|
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
|
throw e;
|
|
|
|
|
} finally {
|
|
|
|
|
releaseWrite(slot, connection);
|
|
|
|
|
}
|
|
|
|
|
} catch (RedisConnectionException e) {
|
|
|
|
|
if (attempt == config.getRetryAttempts()) {
|
|
|
|
|
throw e;
|
|
|
|
|
}
|
|
|
|
|
try {
|
|
|
|
|
Thread.sleep(config.getRetryInterval());
|
|
|
|
|
} catch (InterruptedException e1) {
|
|
|
|
|
Thread.currentThread().interrupt();
|
|
|
|
|
}
|
|
|
|
|
attempt++;
|
|
|
|
|
return write(slot, operation, attempt);
|
|
|
|
|
}
|
|
|
|
|
public <R> R write(String key, SyncOperation<R> operation) {
|
|
|
|
|
int slot = calcSlot(key);
|
|
|
|
|
return async(false, slot, operation, 0);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public <V, R> R write(String key, SyncOperation<V, R> operation) {
|
|
|
|
|
public <R> R read(String key, SyncOperation<R> operation) {
|
|
|
|
|
int slot = calcSlot(key);
|
|
|
|
|
return write(slot, operation, 0);
|
|
|
|
|
return async(true, slot, operation, 0);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private <V, R> R write(int slot, SyncOperation<V, R> operation, int attempt) {
|
|
|
|
|
private <R> R async(boolean readOnlyMode, int slot, SyncOperation<R> operation, int attempt) {
|
|
|
|
|
try {
|
|
|
|
|
RedisConnection<Object, V> connection = connectionWriteOp(slot);
|
|
|
|
|
RedisConnection connection;
|
|
|
|
|
if (readOnlyMode) {
|
|
|
|
|
connection = connectionReadOp(slot);
|
|
|
|
|
} else {
|
|
|
|
|
connection = connectionWriteOp(slot);
|
|
|
|
|
}
|
|
|
|
|
try {
|
|
|
|
|
return operation.execute(connection);
|
|
|
|
|
return operation.execute(codec, connection);
|
|
|
|
|
} catch (RedisMovedException e) {
|
|
|
|
|
return write(e.getSlot(), operation, attempt);
|
|
|
|
|
return async(readOnlyMode, e.getSlot(), operation, attempt);
|
|
|
|
|
} catch (RedisTimeoutException e) {
|
|
|
|
|
if (attempt == config.getRetryAttempts()) {
|
|
|
|
|
throw e;
|
|
|
|
|
}
|
|
|
|
|
attempt++;
|
|
|
|
|
return write(slot, operation, attempt);
|
|
|
|
|
return async(readOnlyMode, slot, operation, attempt);
|
|
|
|
|
} finally {
|
|
|
|
|
releaseWrite(slot, connection);
|
|
|
|
|
if (readOnlyMode) {
|
|
|
|
|
releaseRead(slot, connection);
|
|
|
|
|
} else {
|
|
|
|
|
releaseWrite(slot, connection);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} catch (RedisConnectionException e) {
|
|
|
|
|
if (attempt == config.getRetryAttempts()) {
|
|
|
|
@ -363,192 +313,81 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
|
|
|
|
|
Thread.currentThread().interrupt();
|
|
|
|
|
}
|
|
|
|
|
attempt++;
|
|
|
|
|
return write(slot, operation, attempt);
|
|
|
|
|
return async(readOnlyMode, slot, operation, attempt);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public <V, R> R read(String key, SyncOperation<V, R> operation) {
|
|
|
|
|
int slot = calcSlot(key);
|
|
|
|
|
return read(slot, operation, 0);
|
|
|
|
|
public <T, R> R write(String key, RedisCommand<T> command, Object ... params) {
|
|
|
|
|
Future<R> res = writeAsync(key, command, params);
|
|
|
|
|
return get(res);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private <V, R> R read(int slot, SyncOperation<V, R> operation, int attempt) {
|
|
|
|
|
try {
|
|
|
|
|
RedisConnection<Object, V> connection = connectionReadOp(slot);
|
|
|
|
|
try {
|
|
|
|
|
return operation.execute(connection);
|
|
|
|
|
} catch (RedisMovedException e) {
|
|
|
|
|
return read(e.getSlot(), operation, attempt);
|
|
|
|
|
} catch (RedisTimeoutException e) {
|
|
|
|
|
if (attempt == config.getRetryAttempts()) {
|
|
|
|
|
throw e;
|
|
|
|
|
}
|
|
|
|
|
attempt++;
|
|
|
|
|
return read(slot, operation, attempt);
|
|
|
|
|
} finally {
|
|
|
|
|
releaseRead(slot, connection);
|
|
|
|
|
}
|
|
|
|
|
} catch (RedisConnectionException e) {
|
|
|
|
|
if (attempt == config.getRetryAttempts()) {
|
|
|
|
|
throw e;
|
|
|
|
|
}
|
|
|
|
|
try {
|
|
|
|
|
Thread.sleep(config.getRetryInterval());
|
|
|
|
|
} catch (InterruptedException e1) {
|
|
|
|
|
Thread.currentThread().interrupt();
|
|
|
|
|
}
|
|
|
|
|
attempt++;
|
|
|
|
|
return read(slot, operation, attempt);
|
|
|
|
|
}
|
|
|
|
|
public <T, R> Future<R> evalAsync(RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
|
|
|
|
|
return evalAsync(codec, evalCommandType, script, keys, params);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private int calcSlot(String key) {
|
|
|
|
|
if (entries.size() == 1) {
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int start = key.indexOf('{');
|
|
|
|
|
if (start != -1) {
|
|
|
|
|
int end = key.indexOf('}');
|
|
|
|
|
key = key.substring(start+1, end);
|
|
|
|
|
}
|
|
|
|
|
public <T, R> Future<R> evalAsync(Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
|
|
|
|
|
Promise<R> mainPromise = getGroup().next().newPromise();
|
|
|
|
|
List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
|
|
|
|
|
args.add(script);
|
|
|
|
|
args.add(keys.size());
|
|
|
|
|
args.addAll(keys);
|
|
|
|
|
args.addAll(Arrays.asList(params));
|
|
|
|
|
async(false, -1, null, codec, evalCommandType, args.toArray(), mainPromise, 0);
|
|
|
|
|
return mainPromise;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int result = CRC16.crc16(key.getBytes()) % 16384;
|
|
|
|
|
log.debug("slot {} for {}", result, key);
|
|
|
|
|
return result;
|
|
|
|
|
public <T, R> R eval(RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
|
|
|
|
|
return eval(codec, evalCommandType, script, keys, params);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public <V, R> R write(String key, AsyncOperation<V, R> asyncOperation) {
|
|
|
|
|
Promise<R> mainPromise = getGroup().next().newPromise();
|
|
|
|
|
int slot = calcSlot(key);
|
|
|
|
|
writeAsync(slot, asyncOperation, mainPromise, 0);
|
|
|
|
|
return get(mainPromise);
|
|
|
|
|
public <T, R> R eval(Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
|
|
|
|
|
Future<R> res = evalAsync(codec, evalCommandType, script, keys, params);
|
|
|
|
|
return get(res);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public <V> V get(Future<V> future) {
|
|
|
|
|
future.awaitUninterruptibly();
|
|
|
|
|
if (future.isSuccess()) {
|
|
|
|
|
return future.getNow();
|
|
|
|
|
}
|
|
|
|
|
throw future.cause() instanceof RedisException ?
|
|
|
|
|
(RedisException) future.cause() :
|
|
|
|
|
new RedisException("Unexpected exception while processing command", future.cause());
|
|
|
|
|
|
|
|
|
|
public <T, R> Future<R> writeAsync(String key, RedisCommand<T> command, Object ... params) {
|
|
|
|
|
return writeAsync(key, codec, command, params);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public <V, T> T read(String key, AsyncOperation<V, T> asyncOperation) {
|
|
|
|
|
Promise<T> mainPromise = getGroup().next().newPromise();
|
|
|
|
|
int slot = calcSlot(key);
|
|
|
|
|
readAsync(slot, asyncOperation, mainPromise, 0);
|
|
|
|
|
return get(mainPromise);
|
|
|
|
|
public <T, R> R write(String key, Codec codec, RedisCommand<T> command, Object ... params) {
|
|
|
|
|
Future<R> res = writeAsync(key, codec, command, params);
|
|
|
|
|
return get(res);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public <V, T> Future<T> readAsync(String key, AsyncOperation<V, T> asyncOperation) {
|
|
|
|
|
Promise<T> mainPromise = getGroup().next().newPromise();
|
|
|
|
|
public <T, R> Future<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) {
|
|
|
|
|
Promise<R> mainPromise = getGroup().next().newPromise();
|
|
|
|
|
int slot = calcSlot(key);
|
|
|
|
|
readAsync(slot, asyncOperation, mainPromise, 0);
|
|
|
|
|
async(false, slot, null, codec, command, params, mainPromise, 0);
|
|
|
|
|
return mainPromise;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public <V, T> Future<T> readAsync(AsyncOperation<V, T> asyncOperation) {
|
|
|
|
|
Promise<T> mainPromise = getGroup().next().newPromise();
|
|
|
|
|
readAsync(-1, asyncOperation, mainPromise, 0);
|
|
|
|
|
return mainPromise;
|
|
|
|
|
public <T, R> R write(RedisCommand<T> command, Object ... params) {
|
|
|
|
|
return write(codec, command, params);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private <V, T> void readAsync(final int slot, final AsyncOperation<V, T> asyncOperation, final Promise<T> mainPromise, final int attempt) {
|
|
|
|
|
final Promise<T> promise = getGroup().next().newPromise();
|
|
|
|
|
final AtomicReference<RedisException> ex = new AtomicReference<RedisException>();
|
|
|
|
|
|
|
|
|
|
TimerTask timerTask = new TimerTask() {
|
|
|
|
|
@Override
|
|
|
|
|
public void run(Timeout timeout) throws Exception {
|
|
|
|
|
if (promise.isDone()) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
if (attempt == config.getRetryAttempts()) {
|
|
|
|
|
promise.setFailure(ex.get());
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
promise.cancel(true);
|
|
|
|
|
|
|
|
|
|
int count = attempt + 1;
|
|
|
|
|
readAsync(slot, asyncOperation, mainPromise, count);
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
RedisConnection<Object, V> connection = connectionReadOp(slot);
|
|
|
|
|
RedisAsyncConnection<Object, V> async = connection.getAsync();
|
|
|
|
|
log.debug("readAsync for slot {} using {}", slot, connection.getRedisClient().getAddr());
|
|
|
|
|
asyncOperation.execute(promise, async);
|
|
|
|
|
|
|
|
|
|
ex.set(new RedisTimeoutException());
|
|
|
|
|
Timeout timeout = timer.newTimeout(timerTask, config.getTimeout(), TimeUnit.MILLISECONDS);
|
|
|
|
|
promise.addListener(createReleaseReadListener(slot, connection, timeout));
|
|
|
|
|
} catch (RedisConnectionException e) {
|
|
|
|
|
ex.set(e);
|
|
|
|
|
timer.newTimeout(timerTask, config.getRetryInterval(), TimeUnit.MILLISECONDS);
|
|
|
|
|
}
|
|
|
|
|
promise.addListener(new FutureListener<T>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<T> future) throws Exception {
|
|
|
|
|
if (future.isCancelled()) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
// TODO cancel timeout
|
|
|
|
|
|
|
|
|
|
if (future.cause() instanceof RedisMovedException) {
|
|
|
|
|
RedisMovedException ex = (RedisMovedException)future.cause();
|
|
|
|
|
readAsync(ex.getSlot(), asyncOperation, mainPromise, attempt);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
public <T, R> R write(Codec codec, RedisCommand<T> command, Object ... params) {
|
|
|
|
|
Future<R> res = writeAsync(codec, command, params);
|
|
|
|
|
return get(res);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (future.isSuccess()) {
|
|
|
|
|
mainPromise.setSuccess(future.getNow());
|
|
|
|
|
} else {
|
|
|
|
|
mainPromise.setFailure(future.cause());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
public <T, R> Future<R> writeAsync(RedisCommand<T> command, Object ... params) {
|
|
|
|
|
return writeAsync(codec, command, params);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public <T, R> Future<R> readAsync(String key, RedisCommand<T> command, Object ... params) {
|
|
|
|
|
public <T, R> Future<R> writeAsync(Codec codec, RedisCommand<T> command, Object ... params) {
|
|
|
|
|
Promise<R> mainPromise = getGroup().next().newPromise();
|
|
|
|
|
int slot = calcSlot(key);
|
|
|
|
|
async(true, slot, new StringCodec(), command, params, mainPromise, 0);
|
|
|
|
|
return mainPromise;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public Future<Void> writeAsyncVoid(String key, RedisCommand<String> command, Object ... params) {
|
|
|
|
|
final Promise<Void> voidPromise = getGroup().next().newPromise();
|
|
|
|
|
Promise<String> mainPromise = getGroup().next().newPromise();
|
|
|
|
|
mainPromise.addListener(new FutureListener<String>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<String> future) throws Exception {
|
|
|
|
|
if (future.isCancelled()) {
|
|
|
|
|
voidPromise.cancel(true);
|
|
|
|
|
} else {
|
|
|
|
|
if (future.isSuccess()) {
|
|
|
|
|
voidPromise.setSuccess(null);
|
|
|
|
|
} else {
|
|
|
|
|
voidPromise.setFailure(future.cause());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
int slot = calcSlot(key);
|
|
|
|
|
async(false, slot, new StringCodec(), command, params, mainPromise, 0);
|
|
|
|
|
return voidPromise;
|
|
|
|
|
}
|
|
|
|
|
for (Integer slot : entries.keySet()) {
|
|
|
|
|
async(false, slot, null, codec, command, params, mainPromise, 0);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public <T, R> Future<R> writeAsync(String key, RedisCommand<T> command, Object ... params) {
|
|
|
|
|
Promise<R> mainPromise = getGroup().next().newPromise();
|
|
|
|
|
int slot = calcSlot(key);
|
|
|
|
|
async(false, slot, new StringCodec(), command, params, mainPromise, 0);
|
|
|
|
|
return mainPromise;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private <V, R> void async(final boolean readOnlyMode, final int slot, final Codec codec, final RedisCommand<V> command,
|
|
|
|
|
|
|
|
|
|
private <V, R> void async(final boolean readOnlyMode, final int slot, final MultiDecoder<Object> messageDecoder, final Codec codec, final RedisCommand<V> command,
|
|
|
|
|
final Object[] params, final Promise<R> mainPromise, final int attempt) {
|
|
|
|
|
final Promise<R> attemptPromise = getGroup().next().newPromise();
|
|
|
|
|
final AtomicReference<RedisException> ex = new AtomicReference<RedisException>();
|
|
|
|
@ -566,23 +405,28 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
|
|
|
|
|
attemptPromise.cancel(true);
|
|
|
|
|
|
|
|
|
|
int count = attempt + 1;
|
|
|
|
|
async(readOnlyMode, slot, codec, command, params, mainPromise, count);
|
|
|
|
|
async(readOnlyMode, slot, messageDecoder, codec, command, params, mainPromise, count);
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
org.redisson.client.RedisConnection connection;
|
|
|
|
|
if (readOnlyMode) {
|
|
|
|
|
connection = connectionReadOp2(slot);
|
|
|
|
|
connection = connectionReadOp(slot);
|
|
|
|
|
} else {
|
|
|
|
|
connection = connectionReadOp2(slot);
|
|
|
|
|
connection = connectionWriteOp(slot);
|
|
|
|
|
}
|
|
|
|
|
log.debug("readAsync for slot {} using {}", slot, connection.getRedisClient().getAddr());
|
|
|
|
|
connection.send(new RedisData<V, R>(attemptPromise, codec, command, params));
|
|
|
|
|
connection.send(new RedisData<V, R>(attemptPromise, messageDecoder, codec, command, params));
|
|
|
|
|
|
|
|
|
|
ex.set(new RedisTimeoutException());
|
|
|
|
|
Timeout timeout = timer.newTimeout(timerTask, config.getTimeout(), TimeUnit.MILLISECONDS);
|
|
|
|
|
attemptPromise.addListener(createReleaseReadListener(slot, connection, timeout));
|
|
|
|
|
|
|
|
|
|
if (readOnlyMode) {
|
|
|
|
|
attemptPromise.addListener(createReleaseReadListener(slot, connection, timeout));
|
|
|
|
|
} else {
|
|
|
|
|
attemptPromise.addListener(createReleaseWriteListener(slot, connection, timeout));
|
|
|
|
|
}
|
|
|
|
|
} catch (RedisConnectionException e) {
|
|
|
|
|
ex.set(e);
|
|
|
|
|
timer.newTimeout(timerTask, config.getRetryInterval(), TimeUnit.MILLISECONDS);
|
|
|
|
@ -597,7 +441,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
|
|
|
|
|
|
|
|
|
|
if (future.cause() instanceof RedisMovedException) {
|
|
|
|
|
RedisMovedException ex = (RedisMovedException)future.cause();
|
|
|
|
|
async(readOnlyMode, ex.getSlot(), codec, command, params, mainPromise, attempt);
|
|
|
|
|
async(readOnlyMode, ex.getSlot(), messageDecoder, codec, command, params, mainPromise, attempt);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -617,7 +461,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public <K, V> PubSubConnectionEntry subscribe(String channelName) {
|
|
|
|
|
public PubSubConnectionEntry subscribe(String channelName) {
|
|
|
|
|
// multiple channel names per PubSubConnections allowed
|
|
|
|
|
PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
|
|
|
|
|
if (сonnEntry != null) {
|
|
|
|
@ -638,14 +482,14 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
|
|
|
|
|
entry.release();
|
|
|
|
|
return subscribe(channelName);
|
|
|
|
|
}
|
|
|
|
|
entry.subscribe(channelName);
|
|
|
|
|
entry.subscribe(codec, channelName);
|
|
|
|
|
return entry;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int slot = -1;
|
|
|
|
|
RedisPubSubConnection<K, V> conn = nextPubSubConnection(slot);
|
|
|
|
|
RedisPubSubConnection conn = nextPubSubConnection(slot);
|
|
|
|
|
|
|
|
|
|
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
|
|
|
|
|
entry.tryAcquire();
|
|
|
|
@ -660,13 +504,13 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
|
|
|
|
|
entry.release();
|
|
|
|
|
return subscribe(channelName);
|
|
|
|
|
}
|
|
|
|
|
entry.subscribe(channelName);
|
|
|
|
|
entry.subscribe(codec, channelName);
|
|
|
|
|
return entry;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public <K, V> PubSubConnectionEntry psubscribe(String channelName) {
|
|
|
|
|
public PubSubConnectionEntry psubscribe(String channelName) {
|
|
|
|
|
// multiple channel names per PubSubConnections allowed
|
|
|
|
|
PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
|
|
|
|
|
if (сonnEntry != null) {
|
|
|
|
@ -687,14 +531,14 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
|
|
|
|
|
entry.release();
|
|
|
|
|
return psubscribe(channelName);
|
|
|
|
|
}
|
|
|
|
|
entry.psubscribe(channelName);
|
|
|
|
|
entry.psubscribe(codec, channelName);
|
|
|
|
|
return entry;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int slot = -1;
|
|
|
|
|
RedisPubSubConnection<K, V> conn = nextPubSubConnection(slot);
|
|
|
|
|
RedisPubSubConnection conn = nextPubSubConnection(slot);
|
|
|
|
|
|
|
|
|
|
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
|
|
|
|
|
entry.tryAcquire();
|
|
|
|
@ -709,17 +553,16 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
|
|
|
|
|
entry.release();
|
|
|
|
|
return psubscribe(channelName);
|
|
|
|
|
}
|
|
|
|
|
entry.psubscribe(channelName);
|
|
|
|
|
entry.psubscribe(codec, channelName);
|
|
|
|
|
return entry;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public <K, V> PubSubConnectionEntry subscribe(RedisPubSubAdapter<V> listener, String channelName) {
|
|
|
|
|
public Future<PubSubStatusMessage> subscribe(RedisPubSubListener listener, String channelName) {
|
|
|
|
|
PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
|
|
|
|
|
if (сonnEntry != null) {
|
|
|
|
|
сonnEntry.subscribe(listener, channelName);
|
|
|
|
|
return сonnEntry;
|
|
|
|
|
return сonnEntry.subscribe(codec, listener, channelName);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Set<PubSubConnectionEntry> entries = new HashSet<PubSubConnectionEntry>(name2PubSubConnection.values());
|
|
|
|
@ -728,36 +571,34 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
|
|
|
|
|
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
|
|
|
|
|
if (oldEntry != null) {
|
|
|
|
|
entry.release();
|
|
|
|
|
return oldEntry;
|
|
|
|
|
return group.next().newSucceededFuture(new PubSubStatusMessage(Type.SUBSCRIBE, Arrays.asList(channelName)));
|
|
|
|
|
}
|
|
|
|
|
synchronized (entry) {
|
|
|
|
|
if (!entry.isActive()) {
|
|
|
|
|
entry.release();
|
|
|
|
|
return subscribe(listener, channelName);
|
|
|
|
|
}
|
|
|
|
|
entry.subscribe(listener, channelName);
|
|
|
|
|
return entry;
|
|
|
|
|
return entry.subscribe(codec, listener, channelName);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int slot = -1;
|
|
|
|
|
RedisPubSubConnection<K, V> conn = nextPubSubConnection(slot);
|
|
|
|
|
RedisPubSubConnection conn = nextPubSubConnection(slot);
|
|
|
|
|
|
|
|
|
|
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
|
|
|
|
|
entry.tryAcquire();
|
|
|
|
|
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
|
|
|
|
|
if (oldEntry != null) {
|
|
|
|
|
returnSubscribeConnection(slot, entry);
|
|
|
|
|
return oldEntry;
|
|
|
|
|
return group.next().newSucceededFuture(new PubSubStatusMessage(Type.SUBSCRIBE, Arrays.asList(channelName)));
|
|
|
|
|
}
|
|
|
|
|
synchronized (entry) {
|
|
|
|
|
if (!entry.isActive()) {
|
|
|
|
|
entry.release();
|
|
|
|
|
return subscribe(listener, channelName);
|
|
|
|
|
}
|
|
|
|
|
entry.subscribe(listener, channelName);
|
|
|
|
|
return entry;
|
|
|
|
|
return entry.subscribe(codec, listener, channelName);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -860,23 +701,15 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
|
|
|
|
|
return entries.remove(endSlot);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected <K, V> RedisConnection<K, V> connectionWriteOp(int slot) {
|
|
|
|
|
protected RedisConnection connectionWriteOp(int slot) {
|
|
|
|
|
return getEntry(slot).connectionWriteOp();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public <K, V> RedisConnection<K, V> connectionReadOp(int slot) {
|
|
|
|
|
public RedisConnection connectionReadOp(int slot) {
|
|
|
|
|
return getEntry(slot).connectionReadOp();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public org.redisson.client.RedisConnection connectionReadOp2(int slot) {
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected org.redisson.client.RedisConnection connectionWriteOp2(int slot) {
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
RedisPubSubConnection nextPubSubConnection(int slot) {
|
|
|
|
|
return getEntry(slot).nextPubSubConnection();
|
|
|
|
|
}
|
|
|
|
@ -893,11 +726,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
|
|
|
|
|
getEntry(slot).releaseRead(connection);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void releaseRead(int slot, org.redisson.client.RedisConnection connection) {
|
|
|
|
|
// getEntry(slot).releaseRead(connection);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void shutdown() {
|
|
|
|
|
for (MasterSlaveEntry entry : entries.values()) {
|
|
|
|
|