use same node for SCAN/SSCAN/HSCAN during iteration. #230

pull/236/merge
Nikita 10 years ago
parent 0f663f0c3e
commit 8bdd6cf67a

@ -24,11 +24,11 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.client.WriteRedisConnectionException;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisException;
import org.redisson.client.RedisMovedException;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.WriteRedisConnectionException;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData;
@ -102,7 +102,7 @@ public class CommandBatchExecutorService extends CommandExecutorService {
@Override
protected <V, R> void async(boolean readOnlyMode, int slot, MultiDecoder<Object> messageDecoder,
Codec codec, RedisCommand<V> command, Object[] params, Promise<R> mainPromise, int attempt) {
Codec codec, RedisCommand<V> command, Object[] params, Promise<R> mainPromise, RedisClient client, int attempt) {
if (executed) {
throw new IllegalStateException("Batch already executed!");
}

@ -18,6 +18,7 @@ package org.redisson;
import java.util.Collection;
import java.util.List;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.ConnectionManager;
@ -32,6 +33,8 @@ import io.netty.util.concurrent.Future;
//TODO ping support
public interface CommandExecutor {
<T, R> R read(RedisClient client, String key, RedisCommand<T> command, Object ... params);
<T, R> Future<R> evalWriteAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params);
<R, T> Future<R> writeAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, Object ... params);

@ -26,12 +26,12 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.WriteRedisConnectionException;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException;
import org.redisson.client.RedisMovedException;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.WriteRedisConnectionException;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.RedisCommand;
@ -98,7 +98,7 @@ public class CommandExecutorService implements CommandExecutor {
};
for (Integer slot : connectionManager.getEntries().keySet()) {
async(true, slot, null, connectionManager.getCodec(), command, params, promise, 0);
async(true, slot, null, connectionManager.getCodec(), command, params, promise, null, 0);
}
return mainPromise;
}
@ -135,7 +135,7 @@ public class CommandExecutorService implements CommandExecutor {
});
Integer slot = slots.remove(0);
async(true, slot, null, connectionManager.getCodec(), command, params, attemptPromise, 0);
async(true, slot, null, connectionManager.getCodec(), command, params, attemptPromise, null, 0);
}
public <T> Future<Void> writeAllAsync(RedisCommand<T> command, Object ... params) {
@ -172,7 +172,7 @@ public class CommandExecutorService implements CommandExecutor {
}
};
for (Integer slot : connectionManager.getEntries().keySet()) {
async(readOnlyMode, slot, null, connectionManager.getCodec(), command, params, promise, 0);
async(readOnlyMode, slot, null, connectionManager.getCodec(), command, params, promise, null, 0);
}
return mainPromise;
}
@ -196,10 +196,22 @@ public class CommandExecutorService implements CommandExecutor {
return get(res);
}
public <T, R> R read(RedisClient client, String key, RedisCommand<T> command, Object ... params) {
Future<R> res = readAsync(client, key, connectionManager.getCodec(), command, params);
return get(res);
}
public <T, R> Future<R> readAsync(RedisClient client, String key, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise();
int slot = connectionManager.calcSlot(key);
async(true, slot, null, codec, command, params, mainPromise, client, 0);
return mainPromise;
}
public <T, R> Future<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise();
int slot = connectionManager.calcSlot(key);
async(true, slot, null, codec, command, params, mainPromise, 0);
async(true, slot, null, codec, command, params, mainPromise, null, 0);
return mainPromise;
}
@ -210,7 +222,7 @@ public class CommandExecutorService implements CommandExecutor {
public <T, R> Future<R> writeAsync(Integer slot, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise();
async(false, slot, null, codec, command, params, mainPromise, 0);
async(false, slot, null, codec, command, params, mainPromise, null, 0);
return mainPromise;
}
@ -328,7 +340,7 @@ public class CommandExecutorService implements CommandExecutor {
args.addAll(keys);
args.addAll(Arrays.asList(params));
for (Integer slot : connectionManager.getEntries().keySet()) {
async(readOnlyMode, slot, null, connectionManager.getCodec(), command, args.toArray(), promise, 0);
async(readOnlyMode, slot, null, connectionManager.getCodec(), command, args.toArray(), promise, null, 0);
}
return mainPromise;
}
@ -341,7 +353,7 @@ public class CommandExecutorService implements CommandExecutor {
args.addAll(keys);
args.addAll(Arrays.asList(params));
int slot = connectionManager.calcSlot(key);
async(readOnlyMode, slot, null, codec, evalCommandType, args.toArray(), mainPromise, 0);
async(readOnlyMode, slot, null, codec, evalCommandType, args.toArray(), mainPromise, null, 0);
return mainPromise;
}
@ -371,12 +383,12 @@ public class CommandExecutorService implements CommandExecutor {
public <T, R> Future<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise();
int slot = connectionManager.calcSlot(key);
async(false, slot, null, codec, command, params, mainPromise, 0);
async(false, slot, null, codec, command, params, mainPromise, null, 0);
return mainPromise;
}
protected <V, R> void async(final boolean readOnlyMode, final int slot, final MultiDecoder<Object> messageDecoder, final Codec codec, final RedisCommand<V> command,
final Object[] params, final Promise<R> mainPromise, final int attempt) {
final Object[] params, final Promise<R> mainPromise, final RedisClient client, final int attempt) {
if (!connectionManager.getShutdownLatch().acquire()) {
mainPromise.setFailure(new IllegalStateException("Redisson is shutdown"));
return;
@ -400,18 +412,22 @@ public class CommandExecutorService implements CommandExecutor {
}
int count = attempt + 1;
async(readOnlyMode, slot, messageDecoder, codec, command, params, mainPromise, count);
async(readOnlyMode, slot, messageDecoder, codec, command, params, mainPromise, client, count);
}
};
try {
org.redisson.client.RedisConnection connection;
RedisConnection connection;
if (readOnlyMode) {
connection = connectionManager.connectionReadOp(slot);
if (client != null) {
connection = connectionManager.connectionReadOp(slot, client);
} else {
connection = connectionManager.connectionReadOp(slot);
}
} else {
connection = connectionManager.connectionWriteOp(slot);
}
log.debug("getting connection for command {} via slot {} using {}", command, slot, connection.getRedisClient().getAddr());
log.debug("getting connection for command {} from slot {} using node {}", command, slot, connection.getRedisClient().getAddr());
ChannelFuture future = connection.send(new CommandData<V, R>(attemptPromise, messageDecoder, codec, command, params));
ex.set(new RedisTimeoutException());
@ -449,12 +465,16 @@ public class CommandExecutorService implements CommandExecutor {
if (future.cause() instanceof RedisMovedException) {
RedisMovedException ex = (RedisMovedException)future.cause();
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
async(readOnlyMode, ex.getSlot(), messageDecoder, codec, command, params, mainPromise, attempt);
async(readOnlyMode, ex.getSlot(), messageDecoder, codec, command, params, mainPromise, client, attempt);
return;
}
if (future.isSuccess()) {
mainPromise.setSuccess(future.getNow());
R res = future.getNow();
if (res instanceof RedisClientResult) {
((RedisClientResult)res).setRedisClient(client);
}
mainPromise.setSuccess(res);
} else {
mainPromise.setFailure(future.cause());
}

@ -0,0 +1,11 @@
package org.redisson;
import org.redisson.client.RedisClient;
public interface RedisClientResult {
void setRedisClient(RedisClient client);
RedisClient getRedisClient();
}

@ -27,6 +27,7 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
@ -277,8 +278,8 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
return get(fastRemoveAsync(keys));
}
private MapScanResult<Object, V> scanIterator(long startPos) {
return commandExecutor.read(getName(), RedisCommands.HSCAN, getName(), startPos);
private MapScanResult<Object, V> scanIterator(RedisClient client, long startPos) {
return commandExecutor.read(client, getName(), RedisCommands.HSCAN, getName(), startPos);
}
private Iterator<Map.Entry<K, V>> iterator() {
@ -286,6 +287,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
private Iterator<Map.Entry<K, V>> iter;
private long iterPos = 0;
private RedisClient client;
private boolean removeExecuted;
private Map.Entry<K,V> value;
@ -294,7 +296,10 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
public boolean hasNext() {
if (iter == null
|| (!iter.hasNext() && iterPos != 0)) {
MapScanResult<Object, V> res = scanIterator(iterPos);
MapScanResult<Object, V> res = scanIterator(client, iterPos);
if (iter == null) {
client = res.getRedisClient();
}
iter = ((Map<K, V>)res.getMap()).entrySet().iterator();
iterPos = res.getPos();
}

@ -22,6 +22,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
@ -68,8 +69,8 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
return commandExecutor.readAsync(getName(), RedisCommands.SISMEMBER, getName(), o);
}
private ListScanResult<V> scanIterator(long startPos) {
return commandExecutor.read(getName(), RedisCommands.SSCAN, getName(), startPos);
private ListScanResult<V> scanIterator(RedisClient client, long startPos) {
return commandExecutor.read(client, getName(), RedisCommands.SSCAN, getName(), startPos);
}
@Override
@ -77,6 +78,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
return new Iterator<V>() {
private Iterator<V> iter;
private RedisClient client;
private Long iterPos;
private boolean removeExecuted;
@ -85,11 +87,12 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
@Override
public boolean hasNext() {
if (iter == null) {
ListScanResult<V> res = scanIterator(0);
ListScanResult<V> res = scanIterator(null, 0);
client = res.getRedisClient();
iter = res.getValues().iterator();
iterPos = res.getPos();
} else if (!iter.hasNext() && iterPos != 0) {
ListScanResult<V> res = scanIterator(iterPos);
ListScanResult<V> res = scanIterator(client, iterPos);
iter = res.getValues().iterator();
iterPos = res.getPos();
}

@ -17,10 +17,14 @@ package org.redisson.client.protocol.decoder;
import java.util.List;
public class ListScanResult<V> {
import org.redisson.RedisClientResult;
import org.redisson.client.RedisClient;
public class ListScanResult<V> implements RedisClientResult {
private final Long pos;
private final List<V> values;
private RedisClient client;
public ListScanResult(Long pos, List<V> values) {
this.pos = pos;
@ -35,4 +39,13 @@ public class ListScanResult<V> {
return values;
}
@Override
public void setRedisClient(RedisClient client) {
this.client = client;
}
public RedisClient getRedisClient() {
return client;
}
}

@ -17,10 +17,14 @@ package org.redisson.client.protocol.decoder;
import java.util.Map;
public class MapScanResult<K, V> {
import org.redisson.RedisClientResult;
import org.redisson.client.RedisClient;
public class MapScanResult<K, V> implements RedisClientResult {
private final Long pos;
private final Map<K, V> values;
private RedisClient client;
public MapScanResult(Long pos, Map<K, V> values) {
super();
@ -36,4 +40,13 @@ public class MapScanResult<K, V> {
return values;
}
@Override
public void setRedisClient(RedisClient client) {
this.client = client;
}
public RedisClient getRedisClient() {
return client;
}
}

@ -20,10 +20,10 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.Map;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException;
@ -33,6 +33,8 @@ import org.redisson.misc.ReclosableLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.internal.PlatformDependent;
abstract class BaseLoadBalancer implements LoadBalancer {
private final Logger log = LoggerFactory.getLogger(getClass());
@ -41,7 +43,7 @@ abstract class BaseLoadBalancer implements LoadBalancer {
private ConnectionManager connectionManager;
private final ReclosableLatch clientsEmpty = new ReclosableLatch();
final Queue<SubscribesConnectionEntry> clients = new ConcurrentLinkedQueue<SubscribesConnectionEntry>();
final Map<RedisClient, SubscribesConnectionEntry> clients = PlatformDependent.newConcurrentHashMap();
public void init(MasterSlaveServersConfig config, ConnectionManager connectionManager) {
this.config = config;
@ -49,7 +51,7 @@ abstract class BaseLoadBalancer implements LoadBalancer {
}
public synchronized void add(SubscribesConnectionEntry entry) {
clients.add(entry);
clients.put(entry.getClient(), entry);
if (!entry.isFreezed()) {
clientsEmpty.open();
}
@ -57,7 +59,7 @@ abstract class BaseLoadBalancer implements LoadBalancer {
public int getAvailableClients() {
int count = 0;
for (SubscribesConnectionEntry connectionEntry : clients) {
for (SubscribesConnectionEntry connectionEntry : clients.values()) {
if (!connectionEntry.isFreezed()) {
count++;
}
@ -67,7 +69,7 @@ abstract class BaseLoadBalancer implements LoadBalancer {
public synchronized void unfreeze(String host, int port) {
InetSocketAddress addr = new InetSocketAddress(host, port);
for (SubscribesConnectionEntry connectionEntry : clients) {
for (SubscribesConnectionEntry connectionEntry : clients.values()) {
if (!connectionEntry.getClient().getAddr().equals(addr)) {
continue;
}
@ -80,7 +82,7 @@ abstract class BaseLoadBalancer implements LoadBalancer {
public synchronized Collection<RedisPubSubConnection> freeze(String host, int port) {
InetSocketAddress addr = new InetSocketAddress(host, port);
for (SubscribesConnectionEntry connectionEntry : clients) {
for (SubscribesConnectionEntry connectionEntry : clients.values()) {
if (connectionEntry.isFreezed()
|| !connectionEntry.getClient().getAddr().equals(addr)) {
continue;
@ -109,7 +111,7 @@ abstract class BaseLoadBalancer implements LoadBalancer {
boolean allFreezed = true;
for (SubscribesConnectionEntry entry : clients) {
for (SubscribesConnectionEntry entry : clients.values()) {
if (!entry.isFreezed()) {
allFreezed = false;
break;
@ -129,7 +131,7 @@ abstract class BaseLoadBalancer implements LoadBalancer {
public RedisPubSubConnection nextPubSubConnection() {
clientsEmpty.awaitUninterruptibly();
List<SubscribesConnectionEntry> clientsCopy = new ArrayList<SubscribesConnectionEntry>(clients);
List<SubscribesConnectionEntry> clientsCopy = new ArrayList<SubscribesConnectionEntry>(clients.values());
while (true) {
if (clientsCopy.isEmpty()) {
throw new RedisConnectionException("Slave subscribe-connection pool gets exhausted!");
@ -170,9 +172,21 @@ abstract class BaseLoadBalancer implements LoadBalancer {
}
}
public RedisConnection getConnection(RedisClient client) {
SubscribesConnectionEntry entry = clients.get(client);
if (entry != null) {
RedisConnection conn = retrieveConnection(entry);
if (conn == null) {
throw new RedisConnectionException("Slave connection pool gets exhausted for " + client);
}
return conn;
}
throw new RedisConnectionException("Can't find entry for " + client);
}
public RedisConnection nextConnection() {
clientsEmpty.awaitUninterruptibly();
List<SubscribesConnectionEntry> clientsCopy = new ArrayList<SubscribesConnectionEntry>(clients);
List<SubscribesConnectionEntry> clientsCopy = new ArrayList<SubscribesConnectionEntry>(clients.values());
while (true) {
if (clientsCopy.isEmpty()) {
throw new RedisConnectionException("Slave connection pool gets exhausted!");
@ -181,22 +195,31 @@ abstract class BaseLoadBalancer implements LoadBalancer {
int index = getIndex(clientsCopy);
SubscribesConnectionEntry entry = clientsCopy.get(index);
if (entry.isFreezed()
|| !entry.getConnectionsSemaphore().tryAcquire()) {
RedisConnection conn = retrieveConnection(entry);
if (conn == null) {
clientsCopy.remove(index);
} else {
RedisConnection conn = entry.getConnections().poll();
if (conn != null) {
return conn;
}
try {
return entry.connect(config);
} catch (RedisException e) {
entry.getConnectionsSemaphore().release();
// TODO connection scoring
log.warn("Can't connect to {}, trying next connection!", entry.getClient().getAddr());
clientsCopy.remove(index);
}
return conn;
}
}
}
private RedisConnection retrieveConnection(SubscribesConnectionEntry entry) {
if (entry.isFreezed()
|| !entry.getConnectionsSemaphore().tryAcquire()) {
return null;
} else {
RedisConnection conn = entry.getConnections().poll();
if (conn != null) {
return conn;
}
try {
return entry.connect(config);
} catch (RedisException e) {
entry.getConnectionsSemaphore().release();
// TODO connection scoring
log.warn("Can't connect to {}, trying next connection!", entry.getClient().getAddr());
return null;
}
}
}
@ -204,42 +227,34 @@ abstract class BaseLoadBalancer implements LoadBalancer {
abstract int getIndex(List<SubscribesConnectionEntry> clientsCopy);
public void returnSubscribeConnection(RedisPubSubConnection connection) {
for (SubscribesConnectionEntry entry : clients) {
if (entry.getClient().equals(connection.getRedisClient())) {
if (entry.isFreezed()) {
connection.closeAsync();
} else {
entry.offerFreeSubscribeConnection(connection);
}
entry.getSubscribeConnectionsSemaphore().release();
break;
}
SubscribesConnectionEntry entry = clients.get(connection.getRedisClient());
if (entry.isFreezed()) {
connection.closeAsync();
} else {
entry.offerFreeSubscribeConnection(connection);
}
entry.getSubscribeConnectionsSemaphore().release();
}
public void returnConnection(RedisConnection connection) {
for (SubscribesConnectionEntry entry : clients) {
if (entry.getClient().equals(connection.getRedisClient())) {
if (entry.isFreezed()) {
connection.closeAsync();
} else {
entry.getConnections().add(connection);
}
entry.getConnectionsSemaphore().release();
break;
}
SubscribesConnectionEntry entry = clients.get(connection.getRedisClient());
if (entry.isFreezed()) {
connection.closeAsync();
} else {
entry.getConnections().add(connection);
}
entry.getConnectionsSemaphore().release();
}
public void shutdown() {
for (SubscribesConnectionEntry entry : clients) {
for (SubscribesConnectionEntry entry : clients.values()) {
entry.getClient().shutdown();
}
}
public void shutdownAsync() {
for (SubscribesConnectionEntry entry : clients) {
connectionManager.shutdownAsync(entry.getClient());
for (RedisClient client : clients.keySet()) {
connectionManager.shutdownAsync(client);
}
}

@ -63,6 +63,8 @@ public interface ConnectionManager {
RedisConnection connectionReadOp(int slot);
RedisConnection connectionReadOp(int slot, RedisClient client);
RedisConnection connectionWriteOp(int slot);
<T> FutureListener<T> createReleaseReadListener(int slot,

@ -18,11 +18,14 @@ package org.redisson.connection;
import java.util.Collection;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
public interface LoadBalancer {
RedisConnection getConnection(RedisClient client);
int getAvailableClients();
void shutdownAsync();

@ -18,7 +18,6 @@ package org.redisson.connection;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Set;
@ -466,6 +465,15 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return e.connectionReadOp();
}
@Override
public RedisConnection connectionReadOp(int slot, RedisClient client) {
MasterSlaveEntry e = getEntry(slot);
if (!e.isOwn(slot)) {
throw new RedisEmptySlotException("No node for slot: " + slot, slot);
}
return e.connectionReadOp(client);
}
RedisPubSubConnection nextPubSubConnection(int slot) {
return getEntry(slot).nextPubSubConnection();
}

@ -146,6 +146,11 @@ public class MasterSlaveEntry {
return slaveBalancer.nextConnection();
}
public RedisConnection connectionReadOp(RedisClient client) {
return slaveBalancer.getConnection(client);
}
RedisPubSubConnection nextPubSubConnection() {
return slaveBalancer.nextPubSubConnection();
}

Loading…
Cancel
Save