Cluster MOVED response handling. #273

pull/282/head
Nikita 9 years ago
parent 055ddcc8c1
commit 9174a33994

@ -25,7 +25,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.redisson.client.RedisAskException; import org.redisson.client.RedisAskException;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException; import org.redisson.client.RedisException;
import org.redisson.client.RedisMovedException; import org.redisson.client.RedisMovedException;
@ -38,6 +37,7 @@ import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.decoder.MultiDecoder; import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.connection.NodeSource; import org.redisson.connection.NodeSource;
import org.redisson.connection.NodeSource.Redirect;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
@ -105,7 +105,7 @@ public class CommandBatchExecutorService extends CommandExecutorService {
@Override @Override
protected <V, R> void async(boolean readOnlyMode, NodeSource nodeSource, MultiDecoder<Object> messageDecoder, protected <V, R> void async(boolean readOnlyMode, NodeSource nodeSource, MultiDecoder<Object> messageDecoder,
Codec codec, RedisCommand<V> command, Object[] params, Promise<R> mainPromise, RedisClient client, int attempt) { Codec codec, RedisCommand<V> command, Object[] params, Promise<R> mainPromise, int attempt) {
if (executed) { if (executed) {
throw new IllegalStateException("Batch already executed!"); throw new IllegalStateException("Batch already executed!");
} }
@ -254,12 +254,12 @@ public class CommandBatchExecutorService extends CommandExecutorService {
if (future.cause() instanceof RedisMovedException) { if (future.cause() instanceof RedisMovedException) {
RedisMovedException ex = (RedisMovedException)future.cause(); RedisMovedException ex = (RedisMovedException)future.cause();
execute(entry, new NodeSource(ex.getSlot()), mainPromise, slots, attempt); execute(entry, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), mainPromise, slots, attempt);
return; return;
} }
if (future.cause() instanceof RedisAskException) { if (future.cause() instanceof RedisAskException) {
RedisAskException ex = (RedisAskException)future.cause(); RedisAskException ex = (RedisAskException)future.cause();
execute(entry, new NodeSource(ex.getAddr()), mainPromise, slots, attempt); execute(entry, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), mainPromise, slots, attempt);
return; return;
} }

@ -15,10 +15,10 @@
*/ */
package org.redisson; package org.redisson;
import java.net.InetSocketAddress;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
@ -33,9 +33,9 @@ import io.netty.util.concurrent.Future;
//TODO ping support //TODO ping support
public interface CommandExecutor { public interface CommandExecutor {
<T, R> R read(RedisClient client, String key, Codec codec, RedisCommand<T> command, Object ... params); <T, R> R read(InetSocketAddress client, String key, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> R read(RedisClient client, String key, RedisCommand<T> command, Object ... params); <T, R> R read(InetSocketAddress client, String key, RedisCommand<T> command, Object ... params);
<T, R> Future<R> evalWriteAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params); <T, R> Future<R> evalWriteAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params);

@ -15,6 +15,7 @@
*/ */
package org.redisson; package org.redisson;
import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@ -27,7 +28,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.redisson.client.RedisAskException; import org.redisson.client.RedisAskException;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException; import org.redisson.client.RedisException;
import org.redisson.client.RedisMovedException; import org.redisson.client.RedisMovedException;
@ -42,6 +42,7 @@ import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.cluster.ClusterSlotRange; import org.redisson.cluster.ClusterSlotRange;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.connection.NodeSource; import org.redisson.connection.NodeSource;
import org.redisson.connection.NodeSource.Redirect;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -103,7 +104,7 @@ public class CommandExecutorService implements CommandExecutor {
}; };
for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) { for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) {
async(true, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, params, promise, null, 0); async(true, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, params, promise, 0);
} }
return mainPromise; return mainPromise;
} }
@ -140,7 +141,7 @@ public class CommandExecutorService implements CommandExecutor {
}); });
ClusterSlotRange slot = slots.remove(0); ClusterSlotRange slot = slots.remove(0);
async(true, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, params, attemptPromise, null, 0); async(true, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, params, attemptPromise, 0);
} }
public <T> Future<Void> writeAllAsync(RedisCommand<T> command, Object ... params) { public <T> Future<Void> writeAllAsync(RedisCommand<T> command, Object ... params) {
@ -177,7 +178,7 @@ public class CommandExecutorService implements CommandExecutor {
} }
}; };
for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) { for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) {
async(readOnlyMode, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, params, promise, null, 0); async(readOnlyMode, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, params, promise, 0);
} }
return mainPromise; return mainPromise;
} }
@ -205,28 +206,28 @@ public class CommandExecutorService implements CommandExecutor {
return get(res); return get(res);
} }
public <T, R> R read(RedisClient client, String key, RedisCommand<T> command, Object ... params) { public <T, R> R read(InetSocketAddress client, String key, RedisCommand<T> command, Object ... params) {
Future<R> res = readAsync(client, key, connectionManager.getCodec(), command, params); Future<R> res = readAsync(client, key, connectionManager.getCodec(), command, params);
return get(res); return get(res);
} }
public <T, R> R read(RedisClient client, String key, Codec codec, RedisCommand<T> command, Object ... params) { public <T, R> R read(InetSocketAddress client, String key, Codec codec, RedisCommand<T> command, Object ... params) {
Future<R> res = readAsync(client, key, codec, command, params); Future<R> res = readAsync(client, key, codec, command, params);
return get(res); return get(res);
} }
public <T, R> Future<R> readAsync(RedisClient client, String key, Codec codec, RedisCommand<T> command, Object ... params) { public <T, R> Future<R> readAsync(InetSocketAddress client, String key, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise(); Promise<R> mainPromise = connectionManager.newPromise();
int slot = connectionManager.calcSlot(key); int slot = connectionManager.calcSlot(key);
async(true, new NodeSource(slot), null, codec, command, params, mainPromise, client, 0); async(true, new NodeSource(slot, client), null, codec, command, params, mainPromise, 0);
return mainPromise; return mainPromise;
} }
public <T, R> Future<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) { public <T, R> Future<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise(); Promise<R> mainPromise = connectionManager.newPromise();
int slot = connectionManager.calcSlot(key); int slot = connectionManager.calcSlot(key);
async(true, new NodeSource(slot), null, codec, command, params, mainPromise, null, 0); async(true, new NodeSource(slot), null, codec, command, params, mainPromise, 0);
return mainPromise; return mainPromise;
} }
@ -237,7 +238,7 @@ public class CommandExecutorService implements CommandExecutor {
public <T, R> Future<R> writeAsync(Integer slot, Codec codec, RedisCommand<T> command, Object ... params) { public <T, R> Future<R> writeAsync(Integer slot, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise(); Promise<R> mainPromise = connectionManager.newPromise();
async(false, new NodeSource(slot), null, codec, command, params, mainPromise, null, 0); async(false, new NodeSource(slot), null, codec, command, params, mainPromise, 0);
return mainPromise; return mainPromise;
} }
@ -274,9 +275,9 @@ public class CommandExecutorService implements CommandExecutor {
try { try {
return operation.execute(codec, connection); return operation.execute(codec, connection);
} catch (RedisMovedException e) { } catch (RedisMovedException e) {
return async(readOnlyMode, codec, new NodeSource(e.getSlot()), operation, attempt); return async(readOnlyMode, codec, new NodeSource(e.getSlot(), e.getAddr(), Redirect.MOVED), operation, attempt);
} catch (RedisAskException e) { } catch (RedisAskException e) {
return async(readOnlyMode, codec, new NodeSource(e.getAddr()), operation, attempt); return async(readOnlyMode, codec, new NodeSource(e.getSlot(), e.getAddr(), Redirect.ASK), operation, attempt);
} catch (RedisTimeoutException e) { } catch (RedisTimeoutException e) {
if (attempt == connectionManager.getConfig().getRetryAttempts()) { if (attempt == connectionManager.getConfig().getRetryAttempts()) {
throw e; throw e;
@ -361,7 +362,7 @@ public class CommandExecutorService implements CommandExecutor {
args.addAll(keys); args.addAll(keys);
args.addAll(Arrays.asList(params)); args.addAll(Arrays.asList(params));
for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) { for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) {
async(readOnlyMode, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, args.toArray(), promise, null, 0); async(readOnlyMode, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, args.toArray(), promise, 0);
} }
return mainPromise; return mainPromise;
} }
@ -374,7 +375,7 @@ public class CommandExecutorService implements CommandExecutor {
args.addAll(keys); args.addAll(keys);
args.addAll(Arrays.asList(params)); args.addAll(Arrays.asList(params));
int slot = connectionManager.calcSlot(key); int slot = connectionManager.calcSlot(key);
async(readOnlyMode, new NodeSource(slot), null, codec, evalCommandType, args.toArray(), mainPromise, null, 0); async(readOnlyMode, new NodeSource(slot), null, codec, evalCommandType, args.toArray(), mainPromise, 0);
return mainPromise; return mainPromise;
} }
@ -404,12 +405,12 @@ public class CommandExecutorService implements CommandExecutor {
public <T, R> Future<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) { public <T, R> Future<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise(); Promise<R> mainPromise = connectionManager.newPromise();
int slot = connectionManager.calcSlot(key); int slot = connectionManager.calcSlot(key);
async(false, new NodeSource(slot), null, codec, command, params, mainPromise, null, 0); async(false, new NodeSource(slot), null, codec, command, params, mainPromise, 0);
return mainPromise; return mainPromise;
} }
protected <V, R> void async(final boolean readOnlyMode, final NodeSource source, final MultiDecoder<Object> messageDecoder, final Codec codec, final RedisCommand<V> command, protected <V, R> void async(final boolean readOnlyMode, final NodeSource source, final MultiDecoder<Object> messageDecoder, final Codec codec, final RedisCommand<V> command,
final Object[] params, final Promise<R> mainPromise, final RedisClient client, final int attempt) { final Object[] params, final Promise<R> mainPromise, final int attempt) {
if (!connectionManager.getShutdownLatch().acquire()) { if (!connectionManager.getShutdownLatch().acquire()) {
mainPromise.setFailure(new IllegalStateException("Redisson is shutdown")); mainPromise.setFailure(new IllegalStateException("Redisson is shutdown"));
return; return;
@ -433,7 +434,7 @@ public class CommandExecutorService implements CommandExecutor {
} }
int count = attempt + 1; int count = attempt + 1;
async(readOnlyMode, source, messageDecoder, codec, command, params, mainPromise, client, count); async(readOnlyMode, source, messageDecoder, codec, command, params, mainPromise, count);
} }
}; };
@ -442,11 +443,7 @@ public class CommandExecutorService implements CommandExecutor {
final Future<RedisConnection> connectionFuture; final Future<RedisConnection> connectionFuture;
if (readOnlyMode) { if (readOnlyMode) {
if (client != null) { connectionFuture = connectionManager.connectionReadOp(source, command);
connectionFuture = connectionManager.connectionReadOp(source, command, client);
} else {
connectionFuture = connectionManager.connectionReadOp(source, command);
}
} else { } else {
connectionFuture = connectionManager.connectionWriteOp(source, command); connectionFuture = connectionManager.connectionWriteOp(source, command);
} }
@ -470,8 +467,7 @@ public class CommandExecutorService implements CommandExecutor {
RedisConnection connection = connFuture.getNow(); RedisConnection connection = connFuture.getNow();
ChannelFuture future = null; ChannelFuture future = null;
if (source.getAddr() != null) { if (source.getRedirect() == Redirect.ASK) {
// ASK handling
List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(2); List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(2);
Promise<Void> promise = connectionManager.newPromise(); Promise<Void> promise = connectionManager.newPromise();
list.add(new CommandData<Void, Void>(promise, codec, RedisCommands.ASKING, new Object[] {})); list.add(new CommandData<Void, Void>(promise, codec, RedisCommands.ASKING, new Object[] {}));
@ -524,7 +520,7 @@ public class CommandExecutorService implements CommandExecutor {
RedisMovedException ex = (RedisMovedException)future.cause(); RedisMovedException ex = (RedisMovedException)future.cause();
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
async(readOnlyMode, new NodeSource(ex.getSlot()), messageDecoder, codec, command, params, mainPromise, client, attempt); async(readOnlyMode, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), messageDecoder, codec, command, params, mainPromise, attempt);
return; return;
} }
@ -535,18 +531,18 @@ public class CommandExecutorService implements CommandExecutor {
RedisAskException ex = (RedisAskException)future.cause(); RedisAskException ex = (RedisAskException)future.cause();
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
async(readOnlyMode, new NodeSource(ex.getAddr()), messageDecoder, codec, command, params, mainPromise, client, attempt); async(readOnlyMode, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), messageDecoder, codec, command, params, mainPromise, attempt);
return; return;
} }
if (future.isSuccess()) { if (future.isSuccess()) {
R res = future.getNow(); R res = future.getNow();
if (res instanceof RedisClientResult) { if (res instanceof RedisClientResult) {
RedisClient redisClient = client; InetSocketAddress addr = source.getAddr();
if (redisClient == null) { if (addr == null) {
redisClient = connectionFuture.getNow().getRedisClient(); addr = connectionFuture.getNow().getRedisClient().getAddr();
} }
((RedisClientResult)res).setRedisClient(redisClient); ((RedisClientResult)res).setRedisClient(addr);
} }
mainPromise.setSuccess(res); mainPromise.setSuccess(res);
} else { } else {

@ -15,12 +15,12 @@
*/ */
package org.redisson; package org.redisson;
import org.redisson.client.RedisClient; import java.net.InetSocketAddress;
public interface RedisClientResult { public interface RedisClientResult {
void setRedisClient(RedisClient client); void setRedisClient(InetSocketAddress addr);
RedisClient getRedisClient(); InetSocketAddress getRedisClient();
} }

@ -16,6 +16,7 @@
package org.redisson; package org.redisson;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@ -27,7 +28,6 @@ import java.util.Map;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.Set; import java.util.Set;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec; import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
@ -293,7 +293,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
return get(fastRemoveAsync(keys)); return get(fastRemoveAsync(keys));
} }
private MapScanResult<Object, V> scanIterator(RedisClient client, long startPos) { private MapScanResult<Object, V> scanIterator(InetSocketAddress client, long startPos) {
return commandExecutor.read(client, getName(), codec, RedisCommands.HSCAN, getName(), startPos); return commandExecutor.read(client, getName(), codec, RedisCommands.HSCAN, getName(), startPos);
} }
@ -303,7 +303,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
private Map<K, V> firstValues; private Map<K, V> firstValues;
private Iterator<Map.Entry<K, V>> iter; private Iterator<Map.Entry<K, V>> iter;
private long iterPos = 0; private long iterPos = 0;
private RedisClient client; private InetSocketAddress client;
private boolean removeExecuted; private boolean removeExecuted;
private Map.Entry<K,V> value; private Map.Entry<K,V> value;

@ -16,13 +16,13 @@
package org.redisson; package org.redisson;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.net.InetSocketAddress;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec; import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
@ -155,7 +155,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANK, getName(), o); return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANK, getName(), o);
} }
private ListScanResult<V> scanIterator(RedisClient client, long startPos) { private ListScanResult<V> scanIterator(InetSocketAddress client, long startPos) {
return commandExecutor.read(client, getName(), codec, RedisCommands.ZSCAN, getName(), startPos); return commandExecutor.read(client, getName(), codec, RedisCommands.ZSCAN, getName(), startPos);
} }
@ -165,7 +165,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
private List<V> firstValues; private List<V> firstValues;
private Iterator<V> iter; private Iterator<V> iter;
private RedisClient client; private InetSocketAddress client;
private long iterPos; private long iterPos;
private boolean removeExecuted; private boolean removeExecuted;

@ -15,6 +15,7 @@
*/ */
package org.redisson; package org.redisson;
import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -22,7 +23,6 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
@ -76,7 +76,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
return commandExecutor.readAsync(getName(), codec, RedisCommands.SISMEMBER, getName(), o); return commandExecutor.readAsync(getName(), codec, RedisCommands.SISMEMBER, getName(), o);
} }
private ListScanResult<V> scanIterator(RedisClient client, long startPos) { private ListScanResult<V> scanIterator(InetSocketAddress client, long startPos) {
return commandExecutor.read(client, getName(), codec, RedisCommands.SSCAN, getName(), startPos); return commandExecutor.read(client, getName(), codec, RedisCommands.SSCAN, getName(), startPos);
} }
@ -86,7 +86,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
private List<V> firstValues; private List<V> firstValues;
private Iterator<V> iter; private Iterator<V> iter;
private RedisClient client; private InetSocketAddress client;
private long iterPos; private long iterPos;
private boolean removeExecuted; private boolean removeExecuted;

@ -15,16 +15,16 @@
*/ */
package org.redisson.client.protocol.decoder; package org.redisson.client.protocol.decoder;
import java.net.InetSocketAddress;
import java.util.List; import java.util.List;
import org.redisson.RedisClientResult; import org.redisson.RedisClientResult;
import org.redisson.client.RedisClient;
public class ListScanResult<V> implements RedisClientResult { public class ListScanResult<V> implements RedisClientResult {
private final Long pos; private final Long pos;
private final List<V> values; private final List<V> values;
private RedisClient client; private InetSocketAddress addr;
public ListScanResult(Long pos, List<V> values) { public ListScanResult(Long pos, List<V> values) {
this.pos = pos; this.pos = pos;
@ -40,12 +40,12 @@ public class ListScanResult<V> implements RedisClientResult {
} }
@Override @Override
public void setRedisClient(RedisClient client) { public void setRedisClient(InetSocketAddress addr) {
this.client = client; this.addr = addr;
} }
public RedisClient getRedisClient() { public InetSocketAddress getRedisClient() {
return client; return addr;
} }
} }

@ -15,16 +15,16 @@
*/ */
package org.redisson.client.protocol.decoder; package org.redisson.client.protocol.decoder;
import java.net.InetSocketAddress;
import java.util.Map; import java.util.Map;
import org.redisson.RedisClientResult; import org.redisson.RedisClientResult;
import org.redisson.client.RedisClient;
public class MapScanResult<K, V> implements RedisClientResult { public class MapScanResult<K, V> implements RedisClientResult {
private final Long pos; private final Long pos;
private final Map<K, V> values; private final Map<K, V> values;
private RedisClient client; private InetSocketAddress client;
public MapScanResult(Long pos, Map<K, V> values) { public MapScanResult(Long pos, Map<K, V> values) {
super(); super();
@ -41,11 +41,11 @@ public class MapScanResult<K, V> implements RedisClientResult {
} }
@Override @Override
public void setRedisClient(RedisClient client) { public void setRedisClient(InetSocketAddress client) {
this.client = client; this.client = client;
} }
public RedisClient getRedisClient() { public InetSocketAddress getRedisClient() {
return client; return client;
} }

@ -23,7 +23,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import org.redisson.MasterSlaveServersConfig; import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
@ -41,7 +40,7 @@ abstract class BaseLoadBalancer implements LoadBalancer {
private final Logger log = LoggerFactory.getLogger(getClass()); private final Logger log = LoggerFactory.getLogger(getClass());
private ConnectionManager connectionManager; private ConnectionManager connectionManager;
final Map<RedisClient, SubscribesConnectionEntry> client2Entry = PlatformDependent.newConcurrentHashMap(); final Map<InetSocketAddress, SubscribesConnectionEntry> addr2Entry = PlatformDependent.newConcurrentHashMap();
PubSubConnectionPoll pubSubEntries; PubSubConnectionPoll pubSubEntries;
@ -54,14 +53,14 @@ abstract class BaseLoadBalancer implements LoadBalancer {
} }
public synchronized void add(SubscribesConnectionEntry entry) { public synchronized void add(SubscribesConnectionEntry entry) {
client2Entry.put(entry.getClient(), entry); addr2Entry.put(entry.getClient().getAddr(), entry);
entries.add(entry); entries.add(entry);
pubSubEntries.add(entry); pubSubEntries.add(entry);
} }
public int getAvailableClients() { public int getAvailableClients() {
int count = 0; int count = 0;
for (SubscribesConnectionEntry connectionEntry : client2Entry.values()) { for (SubscribesConnectionEntry connectionEntry : addr2Entry.values()) {
if (!connectionEntry.isFreezed()) { if (!connectionEntry.isFreezed()) {
count++; count++;
} }
@ -71,7 +70,7 @@ abstract class BaseLoadBalancer implements LoadBalancer {
public synchronized boolean unfreeze(String host, int port, FreezeReason freezeReason) { public synchronized boolean unfreeze(String host, int port, FreezeReason freezeReason) {
InetSocketAddress addr = new InetSocketAddress(host, port); InetSocketAddress addr = new InetSocketAddress(host, port);
for (SubscribesConnectionEntry connectionEntry : client2Entry.values()) { for (SubscribesConnectionEntry connectionEntry : addr2Entry.values()) {
if (!connectionEntry.getClient().getAddr().equals(addr)) { if (!connectionEntry.getClient().getAddr().equals(addr)) {
continue; continue;
} }
@ -91,7 +90,7 @@ abstract class BaseLoadBalancer implements LoadBalancer {
public synchronized Collection<RedisPubSubConnection> freeze(String host, int port, FreezeReason freezeReason) { public synchronized Collection<RedisPubSubConnection> freeze(String host, int port, FreezeReason freezeReason) {
InetSocketAddress addr = new InetSocketAddress(host, port); InetSocketAddress addr = new InetSocketAddress(host, port);
for (SubscribesConnectionEntry connectionEntry : client2Entry.values()) { for (SubscribesConnectionEntry connectionEntry : addr2Entry.values()) {
if (connectionEntry.isFreezed() if (connectionEntry.isFreezed()
|| !connectionEntry.getClient().getAddr().equals(addr)) { || !connectionEntry.getClient().getAddr().equals(addr)) {
continue; continue;
@ -136,12 +135,12 @@ abstract class BaseLoadBalancer implements LoadBalancer {
return pubSubEntries.get(); return pubSubEntries.get();
} }
public Future<RedisConnection> getConnection(RedisClient client) { public Future<RedisConnection> getConnection(InetSocketAddress addr) {
SubscribesConnectionEntry entry = client2Entry.get(client); SubscribesConnectionEntry entry = addr2Entry.get(addr);
if (entry != null) { if (entry != null) {
return entries.get(entry); return entries.get(entry);
} }
RedisConnectionException exception = new RedisConnectionException("Can't find entry for " + client.getAddr()); RedisConnectionException exception = new RedisConnectionException("Can't find entry for " + addr);
return connectionManager.getGroup().next().newFailedFuture(exception); return connectionManager.getGroup().next().newFailedFuture(exception);
} }
@ -150,24 +149,24 @@ abstract class BaseLoadBalancer implements LoadBalancer {
} }
public void returnSubscribeConnection(RedisPubSubConnection connection) { public void returnSubscribeConnection(RedisPubSubConnection connection) {
SubscribesConnectionEntry entry = client2Entry.get(connection.getRedisClient()); SubscribesConnectionEntry entry = addr2Entry.get(connection.getRedisClient().getAddr());
pubSubEntries.returnConnection(entry, connection); pubSubEntries.returnConnection(entry, connection);
} }
public void returnConnection(RedisConnection connection) { public void returnConnection(RedisConnection connection) {
SubscribesConnectionEntry entry = client2Entry.get(connection.getRedisClient()); SubscribesConnectionEntry entry = addr2Entry.get(connection.getRedisClient().getAddr());
entries.returnConnection(entry, connection); entries.returnConnection(entry, connection);
} }
public void shutdown() { public void shutdown() {
for (SubscribesConnectionEntry entry : client2Entry.values()) { for (SubscribesConnectionEntry entry : addr2Entry.values()) {
entry.getClient().shutdown(); entry.getClient().shutdown();
} }
} }
public void shutdownAsync() { public void shutdownAsync() {
for (RedisClient client : client2Entry.keySet()) { for (SubscribesConnectionEntry entry : addr2Entry.values()) {
connectionManager.shutdownAsync(client); connectionManager.shutdownAsync(entry.getClient());
} }
} }

@ -72,8 +72,6 @@ public interface ConnectionManager {
Future<RedisConnection> connectionReadOp(NodeSource source, RedisCommand<?> command); Future<RedisConnection> connectionReadOp(NodeSource source, RedisCommand<?> command);
Future<RedisConnection> connectionReadOp(NodeSource source, RedisCommand<?> command, RedisClient client);
Future<RedisConnection> connectionWriteOp(NodeSource source, RedisCommand<?> command); Future<RedisConnection> connectionWriteOp(NodeSource source, RedisCommand<?> command);
<T> FutureListener<T> createReleaseReadListener(NodeSource source, <T> FutureListener<T> createReleaseReadListener(NodeSource source,

@ -15,11 +15,11 @@
*/ */
package org.redisson.connection; package org.redisson.connection;
import java.net.InetSocketAddress;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import org.redisson.MasterSlaveServersConfig; import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
import org.redisson.connection.ConnectionEntry.FreezeReason; import org.redisson.connection.ConnectionEntry.FreezeReason;
@ -30,7 +30,7 @@ public interface LoadBalancer {
SubscribesConnectionEntry getEntry(List<SubscribesConnectionEntry> clientsCopy); SubscribesConnectionEntry getEntry(List<SubscribesConnectionEntry> clientsCopy);
Future<RedisConnection> getConnection(RedisClient client); Future<RedisConnection> getConnection(InetSocketAddress addr);
int getAvailableClients(); int getAvailableClients();

@ -571,33 +571,17 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
} }
private MasterSlaveEntry getEntry(NodeSource source) { private MasterSlaveEntry getEntry(NodeSource source) {
MasterSlaveEntry e = null; MasterSlaveEntry e = getEntry(source.getSlot());
if (source.getSlot() != null) { if (e == null) {
e = getEntry(source.getSlot()); throw new RedisNodeNotFoundException("No node with slot: " + source.getSlot());
if (e == null) {
throw new RedisNodeNotFoundException("No node with slot: " + source.getSlot());
}
} else {
e = getEntry(source.getAddr());
if (e == null) {
throw new RedisNodeNotFoundException("No node with addr: " + source.getAddr());
}
} }
return e; return e;
} }
private MasterSlaveEntry getEntry(NodeSource source, RedisCommand<?> command) { private MasterSlaveEntry getEntry(NodeSource source, RedisCommand<?> command) {
MasterSlaveEntry e = null; MasterSlaveEntry e = getEntry(source.getSlot());
if (source.getSlot() != null) { if (e == null) {
e = getEntry(source.getSlot()); throw new RedisNodeNotFoundException("No node for slot: " + source.getSlot() + " and command " + command);
if (e == null) {
throw new RedisNodeNotFoundException("No node for slot: " + source.getSlot() + " and command " + command);
}
} else {
e = getEntry(source.getAddr());
if (e == null) {
throw new RedisNodeNotFoundException("No node for addr: " + source.getAddr() + " and command " + command);
}
} }
return e; return e;
} }
@ -605,15 +589,12 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override @Override
public Future<RedisConnection> connectionReadOp(NodeSource source, RedisCommand<?> command) { public Future<RedisConnection> connectionReadOp(NodeSource source, RedisCommand<?> command) {
MasterSlaveEntry e = getEntry(source, command); MasterSlaveEntry e = getEntry(source, command);
if (source.getAddr() != null) {
return e.connectionReadOp(source.getAddr());
}
return e.connectionReadOp(); return e.connectionReadOp();
} }
@Override
public Future<RedisConnection> connectionReadOp(NodeSource source, RedisCommand<?> command, RedisClient client) {
MasterSlaveEntry e = getEntry(source, command);
return e.connectionReadOp(client);
}
Future<RedisPubSubConnection> nextPubSubConnection(int slot) { Future<RedisPubSubConnection> nextPubSubConnection(int slot) {
return getEntry(slot).nextPubSubConnection(); return getEntry(slot).nextPubSubConnection();
} }

@ -154,8 +154,8 @@ public class MasterSlaveEntry<E extends ConnectionEntry> {
return slaveBalancer.nextConnection(); return slaveBalancer.nextConnection();
} }
public Future<RedisConnection> connectionReadOp(RedisClient client) { public Future<RedisConnection> connectionReadOp(InetSocketAddress addr) {
return slaveBalancer.getConnection(client); return slaveBalancer.getConnection(addr);
} }

@ -19,20 +19,28 @@ import java.net.InetSocketAddress;
public class NodeSource { public class NodeSource {
public enum Redirect {MOVED, ASK}
private final Integer slot; private final Integer slot;
private final InetSocketAddress addr; private final InetSocketAddress addr;
private final Redirect redirect;
public NodeSource(Integer slot) { public NodeSource(Integer slot) {
this(slot, null); this(slot, null, null);
} }
public NodeSource(InetSocketAddress addr) { public NodeSource(Integer slot, InetSocketAddress addr) {
this(null, addr); this(slot, addr, null);
} }
private NodeSource(Integer slot, InetSocketAddress addr) { public NodeSource(Integer slot, InetSocketAddress addr, Redirect redirect) {
this.slot = slot; this.slot = slot;
this.addr = addr; this.addr = addr;
this.redirect = redirect;
}
public Redirect getRedirect() {
return redirect;
} }
public Integer getSlot() { public Integer getSlot() {
@ -45,10 +53,7 @@ public class NodeSource {
@Override @Override
public String toString() { public String toString() {
if (addr != null) { return "NodeSource [slot=" + slot + ", addr=" + addr + ", redirect=" + redirect + "]";
return addr.toString();
}
return slot.toString();
} }
} }

@ -15,6 +15,7 @@
*/ */
package org.redisson.connection; package org.redisson.connection;
import java.net.InetSocketAddress;
import java.util.Set; import java.util.Set;
import org.redisson.MasterSlaveServersConfig; import org.redisson.MasterSlaveServersConfig;
@ -56,6 +57,11 @@ public class SingleEntry extends MasterSlaveEntry<SubscribesConnectionEntry> {
pubSubConnectionHolder.returnConnection(masterEntry, entry.getConnection()); pubSubConnectionHolder.returnConnection(masterEntry, entry.getConnection());
} }
@Override
public Future<RedisConnection> connectionReadOp(InetSocketAddress addr) {
return super.connectionWriteOp();
}
@Override @Override
public Future<RedisConnection> connectionReadOp() { public Future<RedisConnection> connectionReadOp() {
return super.connectionWriteOp(); return super.connectionWriteOp();

Loading…
Cancel
Save