diff --git a/src/main/java/org/redisson/CommandBatchExecutorService.java b/src/main/java/org/redisson/CommandBatchExecutorService.java index 330b94736..8c4504996 100644 --- a/src/main/java/org/redisson/CommandBatchExecutorService.java +++ b/src/main/java/org/redisson/CommandBatchExecutorService.java @@ -24,11 +24,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import org.redisson.client.WriteRedisConnectionException; -import org.redisson.client.RedisConnectionException; +import org.redisson.client.RedisClient; import org.redisson.client.RedisException; import org.redisson.client.RedisMovedException; import org.redisson.client.RedisTimeoutException; +import org.redisson.client.WriteRedisConnectionException; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.CommandsData; @@ -102,7 +102,7 @@ public class CommandBatchExecutorService extends CommandExecutorService { @Override protected void async(boolean readOnlyMode, int slot, MultiDecoder messageDecoder, - Codec codec, RedisCommand command, Object[] params, Promise mainPromise, int attempt) { + Codec codec, RedisCommand command, Object[] params, Promise mainPromise, RedisClient client, int attempt) { if (executed) { throw new IllegalStateException("Batch already executed!"); } diff --git a/src/main/java/org/redisson/CommandExecutor.java b/src/main/java/org/redisson/CommandExecutor.java index 22fe5acea..67bf29af0 100644 --- a/src/main/java/org/redisson/CommandExecutor.java +++ b/src/main/java/org/redisson/CommandExecutor.java @@ -18,6 +18,7 @@ package org.redisson; import java.util.Collection; import java.util.List; +import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.connection.ConnectionManager; @@ -32,6 +33,8 @@ import io.netty.util.concurrent.Future; //TODO ping support public interface CommandExecutor { + R read(RedisClient client, String key, RedisCommand command, Object ... params); + Future evalWriteAllAsync(RedisCommand command, SlotCallback callback, String script, List keys, Object ... params); Future writeAllAsync(RedisCommand command, SlotCallback callback, Object ... params); diff --git a/src/main/java/org/redisson/CommandExecutorService.java b/src/main/java/org/redisson/CommandExecutorService.java index 9f3170350..d0929561f 100644 --- a/src/main/java/org/redisson/CommandExecutorService.java +++ b/src/main/java/org/redisson/CommandExecutorService.java @@ -26,12 +26,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; -import org.redisson.client.WriteRedisConnectionException; -import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisException; import org.redisson.client.RedisMovedException; import org.redisson.client.RedisTimeoutException; +import org.redisson.client.WriteRedisConnectionException; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.RedisCommand; @@ -98,7 +98,7 @@ public class CommandExecutorService implements CommandExecutor { }; for (Integer slot : connectionManager.getEntries().keySet()) { - async(true, slot, null, connectionManager.getCodec(), command, params, promise, 0); + async(true, slot, null, connectionManager.getCodec(), command, params, promise, null, 0); } return mainPromise; } @@ -135,7 +135,7 @@ public class CommandExecutorService implements CommandExecutor { }); Integer slot = slots.remove(0); - async(true, slot, null, connectionManager.getCodec(), command, params, attemptPromise, 0); + async(true, slot, null, connectionManager.getCodec(), command, params, attemptPromise, null, 0); } public Future writeAllAsync(RedisCommand command, Object ... params) { @@ -172,7 +172,7 @@ public class CommandExecutorService implements CommandExecutor { } }; for (Integer slot : connectionManager.getEntries().keySet()) { - async(readOnlyMode, slot, null, connectionManager.getCodec(), command, params, promise, 0); + async(readOnlyMode, slot, null, connectionManager.getCodec(), command, params, promise, null, 0); } return mainPromise; } @@ -196,10 +196,22 @@ public class CommandExecutorService implements CommandExecutor { return get(res); } + public R read(RedisClient client, String key, RedisCommand command, Object ... params) { + Future res = readAsync(client, key, connectionManager.getCodec(), command, params); + return get(res); + } + + public Future readAsync(RedisClient client, String key, Codec codec, RedisCommand command, Object ... params) { + Promise mainPromise = connectionManager.newPromise(); + int slot = connectionManager.calcSlot(key); + async(true, slot, null, codec, command, params, mainPromise, client, 0); + return mainPromise; + } + public Future readAsync(String key, Codec codec, RedisCommand command, Object ... params) { Promise mainPromise = connectionManager.newPromise(); int slot = connectionManager.calcSlot(key); - async(true, slot, null, codec, command, params, mainPromise, 0); + async(true, slot, null, codec, command, params, mainPromise, null, 0); return mainPromise; } @@ -210,7 +222,7 @@ public class CommandExecutorService implements CommandExecutor { public Future writeAsync(Integer slot, Codec codec, RedisCommand command, Object ... params) { Promise mainPromise = connectionManager.newPromise(); - async(false, slot, null, codec, command, params, mainPromise, 0); + async(false, slot, null, codec, command, params, mainPromise, null, 0); return mainPromise; } @@ -328,7 +340,7 @@ public class CommandExecutorService implements CommandExecutor { args.addAll(keys); args.addAll(Arrays.asList(params)); for (Integer slot : connectionManager.getEntries().keySet()) { - async(readOnlyMode, slot, null, connectionManager.getCodec(), command, args.toArray(), promise, 0); + async(readOnlyMode, slot, null, connectionManager.getCodec(), command, args.toArray(), promise, null, 0); } return mainPromise; } @@ -341,7 +353,7 @@ public class CommandExecutorService implements CommandExecutor { args.addAll(keys); args.addAll(Arrays.asList(params)); int slot = connectionManager.calcSlot(key); - async(readOnlyMode, slot, null, codec, evalCommandType, args.toArray(), mainPromise, 0); + async(readOnlyMode, slot, null, codec, evalCommandType, args.toArray(), mainPromise, null, 0); return mainPromise; } @@ -371,12 +383,12 @@ public class CommandExecutorService implements CommandExecutor { public Future writeAsync(String key, Codec codec, RedisCommand command, Object ... params) { Promise mainPromise = connectionManager.newPromise(); int slot = connectionManager.calcSlot(key); - async(false, slot, null, codec, command, params, mainPromise, 0); + async(false, slot, null, codec, command, params, mainPromise, null, 0); return mainPromise; } protected void async(final boolean readOnlyMode, final int slot, final MultiDecoder messageDecoder, final Codec codec, final RedisCommand command, - final Object[] params, final Promise mainPromise, final int attempt) { + final Object[] params, final Promise mainPromise, final RedisClient client, final int attempt) { if (!connectionManager.getShutdownLatch().acquire()) { mainPromise.setFailure(new IllegalStateException("Redisson is shutdown")); return; @@ -400,18 +412,22 @@ public class CommandExecutorService implements CommandExecutor { } int count = attempt + 1; - async(readOnlyMode, slot, messageDecoder, codec, command, params, mainPromise, count); + async(readOnlyMode, slot, messageDecoder, codec, command, params, mainPromise, client, count); } }; try { - org.redisson.client.RedisConnection connection; + RedisConnection connection; if (readOnlyMode) { - connection = connectionManager.connectionReadOp(slot); + if (client != null) { + connection = connectionManager.connectionReadOp(slot, client); + } else { + connection = connectionManager.connectionReadOp(slot); + } } else { connection = connectionManager.connectionWriteOp(slot); } - log.debug("getting connection for command {} via slot {} using {}", command, slot, connection.getRedisClient().getAddr()); + log.debug("getting connection for command {} from slot {} using node {}", command, slot, connection.getRedisClient().getAddr()); ChannelFuture future = connection.send(new CommandData(attemptPromise, messageDecoder, codec, command, params)); ex.set(new RedisTimeoutException()); @@ -449,12 +465,16 @@ public class CommandExecutorService implements CommandExecutor { if (future.cause() instanceof RedisMovedException) { RedisMovedException ex = (RedisMovedException)future.cause(); connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); - async(readOnlyMode, ex.getSlot(), messageDecoder, codec, command, params, mainPromise, attempt); + async(readOnlyMode, ex.getSlot(), messageDecoder, codec, command, params, mainPromise, client, attempt); return; } if (future.isSuccess()) { - mainPromise.setSuccess(future.getNow()); + R res = future.getNow(); + if (res instanceof RedisClientResult) { + ((RedisClientResult)res).setRedisClient(client); + } + mainPromise.setSuccess(res); } else { mainPromise.setFailure(future.cause()); } diff --git a/src/main/java/org/redisson/RedisClientResult.java b/src/main/java/org/redisson/RedisClientResult.java new file mode 100644 index 000000000..2fe09b3ab --- /dev/null +++ b/src/main/java/org/redisson/RedisClientResult.java @@ -0,0 +1,11 @@ +package org.redisson; + +import org.redisson.client.RedisClient; + +public interface RedisClientResult { + + void setRedisClient(RedisClient client); + + RedisClient getRedisClient(); + +} diff --git a/src/main/java/org/redisson/RedissonMap.java b/src/main/java/org/redisson/RedissonMap.java index 664c4e61d..96d9cdde8 100644 --- a/src/main/java/org/redisson/RedissonMap.java +++ b/src/main/java/org/redisson/RedissonMap.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; +import org.redisson.client.RedisClient; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; @@ -277,8 +278,8 @@ public class RedissonMap extends RedissonExpirable implements RMap { return get(fastRemoveAsync(keys)); } - private MapScanResult scanIterator(long startPos) { - return commandExecutor.read(getName(), RedisCommands.HSCAN, getName(), startPos); + private MapScanResult scanIterator(RedisClient client, long startPos) { + return commandExecutor.read(client, getName(), RedisCommands.HSCAN, getName(), startPos); } private Iterator> iterator() { @@ -286,6 +287,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { private Iterator> iter; private long iterPos = 0; + private RedisClient client; private boolean removeExecuted; private Map.Entry value; @@ -294,7 +296,10 @@ public class RedissonMap extends RedissonExpirable implements RMap { public boolean hasNext() { if (iter == null || (!iter.hasNext() && iterPos != 0)) { - MapScanResult res = scanIterator(iterPos); + MapScanResult res = scanIterator(client, iterPos); + if (iter == null) { + client = res.getRedisClient(); + } iter = ((Map)res.getMap()).entrySet().iterator(); iterPos = res.getPos(); } diff --git a/src/main/java/org/redisson/RedissonSet.java b/src/main/java/org/redisson/RedissonSet.java index e37bae3cb..13343a60f 100644 --- a/src/main/java/org/redisson/RedissonSet.java +++ b/src/main/java/org/redisson/RedissonSet.java @@ -22,6 +22,7 @@ import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; +import org.redisson.client.RedisClient; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.convertor.BooleanReplayConvertor; @@ -68,8 +69,8 @@ public class RedissonSet extends RedissonExpirable implements RSet { return commandExecutor.readAsync(getName(), RedisCommands.SISMEMBER, getName(), o); } - private ListScanResult scanIterator(long startPos) { - return commandExecutor.read(getName(), RedisCommands.SSCAN, getName(), startPos); + private ListScanResult scanIterator(RedisClient client, long startPos) { + return commandExecutor.read(client, getName(), RedisCommands.SSCAN, getName(), startPos); } @Override @@ -77,6 +78,7 @@ public class RedissonSet extends RedissonExpirable implements RSet { return new Iterator() { private Iterator iter; + private RedisClient client; private Long iterPos; private boolean removeExecuted; @@ -85,11 +87,12 @@ public class RedissonSet extends RedissonExpirable implements RSet { @Override public boolean hasNext() { if (iter == null) { - ListScanResult res = scanIterator(0); + ListScanResult res = scanIterator(null, 0); + client = res.getRedisClient(); iter = res.getValues().iterator(); iterPos = res.getPos(); } else if (!iter.hasNext() && iterPos != 0) { - ListScanResult res = scanIterator(iterPos); + ListScanResult res = scanIterator(client, iterPos); iter = res.getValues().iterator(); iterPos = res.getPos(); } diff --git a/src/main/java/org/redisson/client/protocol/decoder/ListScanResult.java b/src/main/java/org/redisson/client/protocol/decoder/ListScanResult.java index 4e90ad130..3231cf931 100644 --- a/src/main/java/org/redisson/client/protocol/decoder/ListScanResult.java +++ b/src/main/java/org/redisson/client/protocol/decoder/ListScanResult.java @@ -17,10 +17,14 @@ package org.redisson.client.protocol.decoder; import java.util.List; -public class ListScanResult { +import org.redisson.RedisClientResult; +import org.redisson.client.RedisClient; + +public class ListScanResult implements RedisClientResult { private final Long pos; private final List values; + private RedisClient client; public ListScanResult(Long pos, List values) { this.pos = pos; @@ -35,4 +39,13 @@ public class ListScanResult { return values; } + @Override + public void setRedisClient(RedisClient client) { + this.client = client; + } + + public RedisClient getRedisClient() { + return client; + } + } diff --git a/src/main/java/org/redisson/client/protocol/decoder/MapScanResult.java b/src/main/java/org/redisson/client/protocol/decoder/MapScanResult.java index 60028e211..e54591e4e 100644 --- a/src/main/java/org/redisson/client/protocol/decoder/MapScanResult.java +++ b/src/main/java/org/redisson/client/protocol/decoder/MapScanResult.java @@ -17,10 +17,14 @@ package org.redisson.client.protocol.decoder; import java.util.Map; -public class MapScanResult { +import org.redisson.RedisClientResult; +import org.redisson.client.RedisClient; + +public class MapScanResult implements RedisClientResult { private final Long pos; private final Map values; + private RedisClient client; public MapScanResult(Long pos, Map values) { super(); @@ -36,4 +40,13 @@ public class MapScanResult { return values; } + @Override + public void setRedisClient(RedisClient client) { + this.client = client; + } + + public RedisClient getRedisClient() { + return client; + } + } diff --git a/src/main/java/org/redisson/connection/BaseLoadBalancer.java b/src/main/java/org/redisson/connection/BaseLoadBalancer.java index 9b6f5bccb..5a0d8330e 100644 --- a/src/main/java/org/redisson/connection/BaseLoadBalancer.java +++ b/src/main/java/org/redisson/connection/BaseLoadBalancer.java @@ -20,10 +20,10 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.Map; import org.redisson.MasterSlaveServersConfig; +import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisException; @@ -33,6 +33,8 @@ import org.redisson.misc.ReclosableLatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.util.internal.PlatformDependent; + abstract class BaseLoadBalancer implements LoadBalancer { private final Logger log = LoggerFactory.getLogger(getClass()); @@ -41,7 +43,7 @@ abstract class BaseLoadBalancer implements LoadBalancer { private ConnectionManager connectionManager; private final ReclosableLatch clientsEmpty = new ReclosableLatch(); - final Queue clients = new ConcurrentLinkedQueue(); + final Map clients = PlatformDependent.newConcurrentHashMap(); public void init(MasterSlaveServersConfig config, ConnectionManager connectionManager) { this.config = config; @@ -49,7 +51,7 @@ abstract class BaseLoadBalancer implements LoadBalancer { } public synchronized void add(SubscribesConnectionEntry entry) { - clients.add(entry); + clients.put(entry.getClient(), entry); if (!entry.isFreezed()) { clientsEmpty.open(); } @@ -57,7 +59,7 @@ abstract class BaseLoadBalancer implements LoadBalancer { public int getAvailableClients() { int count = 0; - for (SubscribesConnectionEntry connectionEntry : clients) { + for (SubscribesConnectionEntry connectionEntry : clients.values()) { if (!connectionEntry.isFreezed()) { count++; } @@ -67,7 +69,7 @@ abstract class BaseLoadBalancer implements LoadBalancer { public synchronized void unfreeze(String host, int port) { InetSocketAddress addr = new InetSocketAddress(host, port); - for (SubscribesConnectionEntry connectionEntry : clients) { + for (SubscribesConnectionEntry connectionEntry : clients.values()) { if (!connectionEntry.getClient().getAddr().equals(addr)) { continue; } @@ -80,7 +82,7 @@ abstract class BaseLoadBalancer implements LoadBalancer { public synchronized Collection freeze(String host, int port) { InetSocketAddress addr = new InetSocketAddress(host, port); - for (SubscribesConnectionEntry connectionEntry : clients) { + for (SubscribesConnectionEntry connectionEntry : clients.values()) { if (connectionEntry.isFreezed() || !connectionEntry.getClient().getAddr().equals(addr)) { continue; @@ -109,7 +111,7 @@ abstract class BaseLoadBalancer implements LoadBalancer { boolean allFreezed = true; - for (SubscribesConnectionEntry entry : clients) { + for (SubscribesConnectionEntry entry : clients.values()) { if (!entry.isFreezed()) { allFreezed = false; break; @@ -129,7 +131,7 @@ abstract class BaseLoadBalancer implements LoadBalancer { public RedisPubSubConnection nextPubSubConnection() { clientsEmpty.awaitUninterruptibly(); - List clientsCopy = new ArrayList(clients); + List clientsCopy = new ArrayList(clients.values()); while (true) { if (clientsCopy.isEmpty()) { throw new RedisConnectionException("Slave subscribe-connection pool gets exhausted!"); @@ -170,9 +172,21 @@ abstract class BaseLoadBalancer implements LoadBalancer { } } + public RedisConnection getConnection(RedisClient client) { + SubscribesConnectionEntry entry = clients.get(client); + if (entry != null) { + RedisConnection conn = retrieveConnection(entry); + if (conn == null) { + throw new RedisConnectionException("Slave connection pool gets exhausted for " + client); + } + return conn; + } + throw new RedisConnectionException("Can't find entry for " + client); + } + public RedisConnection nextConnection() { clientsEmpty.awaitUninterruptibly(); - List clientsCopy = new ArrayList(clients); + List clientsCopy = new ArrayList(clients.values()); while (true) { if (clientsCopy.isEmpty()) { throw new RedisConnectionException("Slave connection pool gets exhausted!"); @@ -181,22 +195,31 @@ abstract class BaseLoadBalancer implements LoadBalancer { int index = getIndex(clientsCopy); SubscribesConnectionEntry entry = clientsCopy.get(index); - if (entry.isFreezed() - || !entry.getConnectionsSemaphore().tryAcquire()) { + RedisConnection conn = retrieveConnection(entry); + if (conn == null) { clientsCopy.remove(index); } else { - RedisConnection conn = entry.getConnections().poll(); - if (conn != null) { - return conn; - } - try { - return entry.connect(config); - } catch (RedisException e) { - entry.getConnectionsSemaphore().release(); - // TODO connection scoring - log.warn("Can't connect to {}, trying next connection!", entry.getClient().getAddr()); - clientsCopy.remove(index); - } + return conn; + } + } + } + + private RedisConnection retrieveConnection(SubscribesConnectionEntry entry) { + if (entry.isFreezed() + || !entry.getConnectionsSemaphore().tryAcquire()) { + return null; + } else { + RedisConnection conn = entry.getConnections().poll(); + if (conn != null) { + return conn; + } + try { + return entry.connect(config); + } catch (RedisException e) { + entry.getConnectionsSemaphore().release(); + // TODO connection scoring + log.warn("Can't connect to {}, trying next connection!", entry.getClient().getAddr()); + return null; } } } @@ -204,42 +227,34 @@ abstract class BaseLoadBalancer implements LoadBalancer { abstract int getIndex(List clientsCopy); public void returnSubscribeConnection(RedisPubSubConnection connection) { - for (SubscribesConnectionEntry entry : clients) { - if (entry.getClient().equals(connection.getRedisClient())) { - if (entry.isFreezed()) { - connection.closeAsync(); - } else { - entry.offerFreeSubscribeConnection(connection); - } - entry.getSubscribeConnectionsSemaphore().release(); - break; - } + SubscribesConnectionEntry entry = clients.get(connection.getRedisClient()); + if (entry.isFreezed()) { + connection.closeAsync(); + } else { + entry.offerFreeSubscribeConnection(connection); } + entry.getSubscribeConnectionsSemaphore().release(); } public void returnConnection(RedisConnection connection) { - for (SubscribesConnectionEntry entry : clients) { - if (entry.getClient().equals(connection.getRedisClient())) { - if (entry.isFreezed()) { - connection.closeAsync(); - } else { - entry.getConnections().add(connection); - } - entry.getConnectionsSemaphore().release(); - break; - } + SubscribesConnectionEntry entry = clients.get(connection.getRedisClient()); + if (entry.isFreezed()) { + connection.closeAsync(); + } else { + entry.getConnections().add(connection); } + entry.getConnectionsSemaphore().release(); } public void shutdown() { - for (SubscribesConnectionEntry entry : clients) { + for (SubscribesConnectionEntry entry : clients.values()) { entry.getClient().shutdown(); } } public void shutdownAsync() { - for (SubscribesConnectionEntry entry : clients) { - connectionManager.shutdownAsync(entry.getClient()); + for (RedisClient client : clients.keySet()) { + connectionManager.shutdownAsync(client); } } diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index cbe33a1e4..f7bcf5d9e 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -63,6 +63,8 @@ public interface ConnectionManager { RedisConnection connectionReadOp(int slot); + RedisConnection connectionReadOp(int slot, RedisClient client); + RedisConnection connectionWriteOp(int slot); FutureListener createReleaseReadListener(int slot, diff --git a/src/main/java/org/redisson/connection/LoadBalancer.java b/src/main/java/org/redisson/connection/LoadBalancer.java index 1f0f8c044..8c009ef8d 100644 --- a/src/main/java/org/redisson/connection/LoadBalancer.java +++ b/src/main/java/org/redisson/connection/LoadBalancer.java @@ -18,11 +18,14 @@ package org.redisson.connection; import java.util.Collection; import org.redisson.MasterSlaveServersConfig; +import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisPubSubConnection; public interface LoadBalancer { + RedisConnection getConnection(RedisClient client); + int getAvailableClients(); void shutdownAsync(); diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index bc929b429..08f7928ee 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -18,7 +18,6 @@ package org.redisson.connection; import java.util.Collection; import java.util.Collections; import java.util.HashSet; -import java.util.Iterator; import java.util.Map.Entry; import java.util.NavigableMap; import java.util.Set; @@ -466,6 +465,15 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return e.connectionReadOp(); } + @Override + public RedisConnection connectionReadOp(int slot, RedisClient client) { + MasterSlaveEntry e = getEntry(slot); + if (!e.isOwn(slot)) { + throw new RedisEmptySlotException("No node for slot: " + slot, slot); + } + return e.connectionReadOp(client); + } + RedisPubSubConnection nextPubSubConnection(int slot) { return getEntry(slot).nextPubSubConnection(); } diff --git a/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 1be6af126..dde924220 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -146,6 +146,11 @@ public class MasterSlaveEntry { return slaveBalancer.nextConnection(); } + public RedisConnection connectionReadOp(RedisClient client) { + return slaveBalancer.getConnection(client); + } + + RedisPubSubConnection nextPubSubConnection() { return slaveBalancer.nextPubSubConnection(); }