diff --git a/src/main/java/org/redisson/CommandBatchExecutorService.java b/src/main/java/org/redisson/CommandBatchExecutorService.java index 564b11337..cdc7544d0 100644 --- a/src/main/java/org/redisson/CommandBatchExecutorService.java +++ b/src/main/java/org/redisson/CommandBatchExecutorService.java @@ -25,7 +25,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.redisson.client.RedisAskException; -import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisException; import org.redisson.client.RedisMovedException; @@ -38,6 +37,7 @@ import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.decoder.MultiDecoder; import org.redisson.connection.ConnectionManager; import org.redisson.connection.NodeSource; +import org.redisson.connection.NodeSource.Redirect; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -105,7 +105,7 @@ public class CommandBatchExecutorService extends CommandExecutorService { @Override protected void async(boolean readOnlyMode, NodeSource nodeSource, MultiDecoder messageDecoder, - Codec codec, RedisCommand command, Object[] params, Promise mainPromise, RedisClient client, int attempt) { + Codec codec, RedisCommand command, Object[] params, Promise mainPromise, int attempt) { if (executed) { throw new IllegalStateException("Batch already executed!"); } @@ -254,12 +254,12 @@ public class CommandBatchExecutorService extends CommandExecutorService { if (future.cause() instanceof RedisMovedException) { RedisMovedException ex = (RedisMovedException)future.cause(); - execute(entry, new NodeSource(ex.getSlot()), mainPromise, slots, attempt); + execute(entry, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), mainPromise, slots, attempt); return; } if (future.cause() instanceof RedisAskException) { RedisAskException ex = (RedisAskException)future.cause(); - execute(entry, new NodeSource(ex.getAddr()), mainPromise, slots, attempt); + execute(entry, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), mainPromise, slots, attempt); return; } diff --git a/src/main/java/org/redisson/CommandExecutor.java b/src/main/java/org/redisson/CommandExecutor.java index 143e4723d..49d44fcaf 100644 --- a/src/main/java/org/redisson/CommandExecutor.java +++ b/src/main/java/org/redisson/CommandExecutor.java @@ -15,10 +15,10 @@ */ package org.redisson; +import java.net.InetSocketAddress; import java.util.Collection; import java.util.List; -import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.connection.ConnectionManager; @@ -33,9 +33,9 @@ import io.netty.util.concurrent.Future; //TODO ping support public interface CommandExecutor { - R read(RedisClient client, String key, Codec codec, RedisCommand command, Object ... params); + R read(InetSocketAddress client, String key, Codec codec, RedisCommand command, Object ... params); - R read(RedisClient client, String key, RedisCommand command, Object ... params); + R read(InetSocketAddress client, String key, RedisCommand command, Object ... params); Future evalWriteAllAsync(RedisCommand command, SlotCallback callback, String script, List keys, Object ... params); diff --git a/src/main/java/org/redisson/CommandExecutorService.java b/src/main/java/org/redisson/CommandExecutorService.java index 06660b91f..04c496da5 100644 --- a/src/main/java/org/redisson/CommandExecutorService.java +++ b/src/main/java/org/redisson/CommandExecutorService.java @@ -15,6 +15,7 @@ */ package org.redisson; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -27,7 +28,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.redisson.client.RedisAskException; -import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisException; import org.redisson.client.RedisMovedException; @@ -42,6 +42,7 @@ import org.redisson.client.protocol.decoder.MultiDecoder; import org.redisson.cluster.ClusterSlotRange; import org.redisson.connection.ConnectionManager; import org.redisson.connection.NodeSource; +import org.redisson.connection.NodeSource.Redirect; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,7 +104,7 @@ public class CommandExecutorService implements CommandExecutor { }; for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) { - async(true, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, params, promise, null, 0); + async(true, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, params, promise, 0); } return mainPromise; } @@ -140,7 +141,7 @@ public class CommandExecutorService implements CommandExecutor { }); ClusterSlotRange slot = slots.remove(0); - async(true, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, params, attemptPromise, null, 0); + async(true, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, params, attemptPromise, 0); } public Future writeAllAsync(RedisCommand command, Object ... params) { @@ -177,7 +178,7 @@ public class CommandExecutorService implements CommandExecutor { } }; for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) { - async(readOnlyMode, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, params, promise, null, 0); + async(readOnlyMode, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, params, promise, 0); } return mainPromise; } @@ -205,28 +206,28 @@ public class CommandExecutorService implements CommandExecutor { return get(res); } - public R read(RedisClient client, String key, RedisCommand command, Object ... params) { + public R read(InetSocketAddress client, String key, RedisCommand command, Object ... params) { Future res = readAsync(client, key, connectionManager.getCodec(), command, params); return get(res); } - public R read(RedisClient client, String key, Codec codec, RedisCommand command, Object ... params) { + public R read(InetSocketAddress client, String key, Codec codec, RedisCommand command, Object ... params) { Future res = readAsync(client, key, codec, command, params); return get(res); } - public Future readAsync(RedisClient client, String key, Codec codec, RedisCommand command, Object ... params) { + public Future readAsync(InetSocketAddress client, String key, Codec codec, RedisCommand command, Object ... params) { Promise mainPromise = connectionManager.newPromise(); int slot = connectionManager.calcSlot(key); - async(true, new NodeSource(slot), null, codec, command, params, mainPromise, client, 0); + async(true, new NodeSource(slot, client), null, codec, command, params, mainPromise, 0); return mainPromise; } public Future readAsync(String key, Codec codec, RedisCommand command, Object ... params) { Promise mainPromise = connectionManager.newPromise(); int slot = connectionManager.calcSlot(key); - async(true, new NodeSource(slot), null, codec, command, params, mainPromise, null, 0); + async(true, new NodeSource(slot), null, codec, command, params, mainPromise, 0); return mainPromise; } @@ -237,7 +238,7 @@ public class CommandExecutorService implements CommandExecutor { public Future writeAsync(Integer slot, Codec codec, RedisCommand command, Object ... params) { Promise mainPromise = connectionManager.newPromise(); - async(false, new NodeSource(slot), null, codec, command, params, mainPromise, null, 0); + async(false, new NodeSource(slot), null, codec, command, params, mainPromise, 0); return mainPromise; } @@ -274,9 +275,9 @@ public class CommandExecutorService implements CommandExecutor { try { return operation.execute(codec, connection); } catch (RedisMovedException e) { - return async(readOnlyMode, codec, new NodeSource(e.getSlot()), operation, attempt); + return async(readOnlyMode, codec, new NodeSource(e.getSlot(), e.getAddr(), Redirect.MOVED), operation, attempt); } catch (RedisAskException e) { - return async(readOnlyMode, codec, new NodeSource(e.getAddr()), operation, attempt); + return async(readOnlyMode, codec, new NodeSource(e.getSlot(), e.getAddr(), Redirect.ASK), operation, attempt); } catch (RedisTimeoutException e) { if (attempt == connectionManager.getConfig().getRetryAttempts()) { throw e; @@ -361,7 +362,7 @@ public class CommandExecutorService implements CommandExecutor { args.addAll(keys); args.addAll(Arrays.asList(params)); for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) { - async(readOnlyMode, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, args.toArray(), promise, null, 0); + async(readOnlyMode, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, args.toArray(), promise, 0); } return mainPromise; } @@ -374,7 +375,7 @@ public class CommandExecutorService implements CommandExecutor { args.addAll(keys); args.addAll(Arrays.asList(params)); int slot = connectionManager.calcSlot(key); - async(readOnlyMode, new NodeSource(slot), null, codec, evalCommandType, args.toArray(), mainPromise, null, 0); + async(readOnlyMode, new NodeSource(slot), null, codec, evalCommandType, args.toArray(), mainPromise, 0); return mainPromise; } @@ -404,12 +405,12 @@ public class CommandExecutorService implements CommandExecutor { public Future writeAsync(String key, Codec codec, RedisCommand command, Object ... params) { Promise mainPromise = connectionManager.newPromise(); int slot = connectionManager.calcSlot(key); - async(false, new NodeSource(slot), null, codec, command, params, mainPromise, null, 0); + async(false, new NodeSource(slot), null, codec, command, params, mainPromise, 0); return mainPromise; } protected void async(final boolean readOnlyMode, final NodeSource source, final MultiDecoder messageDecoder, final Codec codec, final RedisCommand command, - final Object[] params, final Promise mainPromise, final RedisClient client, final int attempt) { + final Object[] params, final Promise mainPromise, final int attempt) { if (!connectionManager.getShutdownLatch().acquire()) { mainPromise.setFailure(new IllegalStateException("Redisson is shutdown")); return; @@ -433,7 +434,7 @@ public class CommandExecutorService implements CommandExecutor { } int count = attempt + 1; - async(readOnlyMode, source, messageDecoder, codec, command, params, mainPromise, client, count); + async(readOnlyMode, source, messageDecoder, codec, command, params, mainPromise, count); } }; @@ -442,11 +443,7 @@ public class CommandExecutorService implements CommandExecutor { final Future connectionFuture; if (readOnlyMode) { - if (client != null) { - connectionFuture = connectionManager.connectionReadOp(source, command, client); - } else { - connectionFuture = connectionManager.connectionReadOp(source, command); - } + connectionFuture = connectionManager.connectionReadOp(source, command); } else { connectionFuture = connectionManager.connectionWriteOp(source, command); } @@ -470,8 +467,7 @@ public class CommandExecutorService implements CommandExecutor { RedisConnection connection = connFuture.getNow(); ChannelFuture future = null; - if (source.getAddr() != null) { - // ASK handling + if (source.getRedirect() == Redirect.ASK) { List> list = new ArrayList>(2); Promise promise = connectionManager.newPromise(); list.add(new CommandData(promise, codec, RedisCommands.ASKING, new Object[] {})); @@ -524,7 +520,7 @@ public class CommandExecutorService implements CommandExecutor { RedisMovedException ex = (RedisMovedException)future.cause(); connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); - async(readOnlyMode, new NodeSource(ex.getSlot()), messageDecoder, codec, command, params, mainPromise, client, attempt); + async(readOnlyMode, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), messageDecoder, codec, command, params, mainPromise, attempt); return; } @@ -535,18 +531,18 @@ public class CommandExecutorService implements CommandExecutor { RedisAskException ex = (RedisAskException)future.cause(); connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); - async(readOnlyMode, new NodeSource(ex.getAddr()), messageDecoder, codec, command, params, mainPromise, client, attempt); + async(readOnlyMode, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), messageDecoder, codec, command, params, mainPromise, attempt); return; } if (future.isSuccess()) { R res = future.getNow(); if (res instanceof RedisClientResult) { - RedisClient redisClient = client; - if (redisClient == null) { - redisClient = connectionFuture.getNow().getRedisClient(); + InetSocketAddress addr = source.getAddr(); + if (addr == null) { + addr = connectionFuture.getNow().getRedisClient().getAddr(); } - ((RedisClientResult)res).setRedisClient(redisClient); + ((RedisClientResult)res).setRedisClient(addr); } mainPromise.setSuccess(res); } else { diff --git a/src/main/java/org/redisson/RedisClientResult.java b/src/main/java/org/redisson/RedisClientResult.java index 381c2f522..c038fe7fb 100644 --- a/src/main/java/org/redisson/RedisClientResult.java +++ b/src/main/java/org/redisson/RedisClientResult.java @@ -15,12 +15,12 @@ */ package org.redisson; -import org.redisson.client.RedisClient; +import java.net.InetSocketAddress; public interface RedisClientResult { - void setRedisClient(RedisClient client); + void setRedisClient(InetSocketAddress addr); - RedisClient getRedisClient(); + InetSocketAddress getRedisClient(); } diff --git a/src/main/java/org/redisson/RedissonMap.java b/src/main/java/org/redisson/RedissonMap.java index 0d1d991d0..bdbbb05e3 100644 --- a/src/main/java/org/redisson/RedissonMap.java +++ b/src/main/java/org/redisson/RedissonMap.java @@ -16,6 +16,7 @@ package org.redisson; import java.math.BigDecimal; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -27,7 +28,6 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; -import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommand; @@ -293,7 +293,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { return get(fastRemoveAsync(keys)); } - private MapScanResult scanIterator(RedisClient client, long startPos) { + private MapScanResult scanIterator(InetSocketAddress client, long startPos) { return commandExecutor.read(client, getName(), codec, RedisCommands.HSCAN, getName(), startPos); } @@ -303,7 +303,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { private Map firstValues; private Iterator> iter; private long iterPos = 0; - private RedisClient client; + private InetSocketAddress client; private boolean removeExecuted; private Map.Entry value; diff --git a/src/main/java/org/redisson/RedissonScoredSortedSet.java b/src/main/java/org/redisson/RedissonScoredSortedSet.java index e97fad973..ca810aaf7 100644 --- a/src/main/java/org/redisson/RedissonScoredSortedSet.java +++ b/src/main/java/org/redisson/RedissonScoredSortedSet.java @@ -16,13 +16,13 @@ package org.redisson; import java.math.BigDecimal; +import java.net.InetSocketAddress; import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; -import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommand; @@ -155,7 +155,7 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANK, getName(), o); } - private ListScanResult scanIterator(RedisClient client, long startPos) { + private ListScanResult scanIterator(InetSocketAddress client, long startPos) { return commandExecutor.read(client, getName(), codec, RedisCommands.ZSCAN, getName(), startPos); } @@ -165,7 +165,7 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc private List firstValues; private Iterator iter; - private RedisClient client; + private InetSocketAddress client; private long iterPos; private boolean removeExecuted; diff --git a/src/main/java/org/redisson/RedissonSet.java b/src/main/java/org/redisson/RedissonSet.java index d5d563e29..3465fcae4 100644 --- a/src/main/java/org/redisson/RedissonSet.java +++ b/src/main/java/org/redisson/RedissonSet.java @@ -15,6 +15,7 @@ */ package org.redisson; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -22,7 +23,6 @@ import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; -import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; @@ -76,7 +76,7 @@ public class RedissonSet extends RedissonExpirable implements RSet { return commandExecutor.readAsync(getName(), codec, RedisCommands.SISMEMBER, getName(), o); } - private ListScanResult scanIterator(RedisClient client, long startPos) { + private ListScanResult scanIterator(InetSocketAddress client, long startPos) { return commandExecutor.read(client, getName(), codec, RedisCommands.SSCAN, getName(), startPos); } @@ -86,7 +86,7 @@ public class RedissonSet extends RedissonExpirable implements RSet { private List firstValues; private Iterator iter; - private RedisClient client; + private InetSocketAddress client; private long iterPos; private boolean removeExecuted; diff --git a/src/main/java/org/redisson/client/protocol/decoder/ListScanResult.java b/src/main/java/org/redisson/client/protocol/decoder/ListScanResult.java index 3231cf931..830415d5c 100644 --- a/src/main/java/org/redisson/client/protocol/decoder/ListScanResult.java +++ b/src/main/java/org/redisson/client/protocol/decoder/ListScanResult.java @@ -15,16 +15,16 @@ */ package org.redisson.client.protocol.decoder; +import java.net.InetSocketAddress; import java.util.List; import org.redisson.RedisClientResult; -import org.redisson.client.RedisClient; public class ListScanResult implements RedisClientResult { private final Long pos; private final List values; - private RedisClient client; + private InetSocketAddress addr; public ListScanResult(Long pos, List values) { this.pos = pos; @@ -40,12 +40,12 @@ public class ListScanResult implements RedisClientResult { } @Override - public void setRedisClient(RedisClient client) { - this.client = client; + public void setRedisClient(InetSocketAddress addr) { + this.addr = addr; } - public RedisClient getRedisClient() { - return client; + public InetSocketAddress getRedisClient() { + return addr; } } diff --git a/src/main/java/org/redisson/client/protocol/decoder/MapScanResult.java b/src/main/java/org/redisson/client/protocol/decoder/MapScanResult.java index e54591e4e..4672587b5 100644 --- a/src/main/java/org/redisson/client/protocol/decoder/MapScanResult.java +++ b/src/main/java/org/redisson/client/protocol/decoder/MapScanResult.java @@ -15,16 +15,16 @@ */ package org.redisson.client.protocol.decoder; +import java.net.InetSocketAddress; import java.util.Map; import org.redisson.RedisClientResult; -import org.redisson.client.RedisClient; public class MapScanResult implements RedisClientResult { private final Long pos; private final Map values; - private RedisClient client; + private InetSocketAddress client; public MapScanResult(Long pos, Map values) { super(); @@ -41,11 +41,11 @@ public class MapScanResult implements RedisClientResult { } @Override - public void setRedisClient(RedisClient client) { + public void setRedisClient(InetSocketAddress client) { this.client = client; } - public RedisClient getRedisClient() { + public InetSocketAddress getRedisClient() { return client; } diff --git a/src/main/java/org/redisson/connection/BaseLoadBalancer.java b/src/main/java/org/redisson/connection/BaseLoadBalancer.java index 2672005b4..4250e8dec 100644 --- a/src/main/java/org/redisson/connection/BaseLoadBalancer.java +++ b/src/main/java/org/redisson/connection/BaseLoadBalancer.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.Map; import org.redisson.MasterSlaveServersConfig; -import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisPubSubConnection; @@ -41,7 +40,7 @@ abstract class BaseLoadBalancer implements LoadBalancer { private final Logger log = LoggerFactory.getLogger(getClass()); private ConnectionManager connectionManager; - final Map client2Entry = PlatformDependent.newConcurrentHashMap(); + final Map addr2Entry = PlatformDependent.newConcurrentHashMap(); PubSubConnectionPoll pubSubEntries; @@ -54,14 +53,14 @@ abstract class BaseLoadBalancer implements LoadBalancer { } public synchronized void add(SubscribesConnectionEntry entry) { - client2Entry.put(entry.getClient(), entry); + addr2Entry.put(entry.getClient().getAddr(), entry); entries.add(entry); pubSubEntries.add(entry); } public int getAvailableClients() { int count = 0; - for (SubscribesConnectionEntry connectionEntry : client2Entry.values()) { + for (SubscribesConnectionEntry connectionEntry : addr2Entry.values()) { if (!connectionEntry.isFreezed()) { count++; } @@ -71,7 +70,7 @@ abstract class BaseLoadBalancer implements LoadBalancer { public synchronized boolean unfreeze(String host, int port, FreezeReason freezeReason) { InetSocketAddress addr = new InetSocketAddress(host, port); - for (SubscribesConnectionEntry connectionEntry : client2Entry.values()) { + for (SubscribesConnectionEntry connectionEntry : addr2Entry.values()) { if (!connectionEntry.getClient().getAddr().equals(addr)) { continue; } @@ -91,7 +90,7 @@ abstract class BaseLoadBalancer implements LoadBalancer { public synchronized Collection freeze(String host, int port, FreezeReason freezeReason) { InetSocketAddress addr = new InetSocketAddress(host, port); - for (SubscribesConnectionEntry connectionEntry : client2Entry.values()) { + for (SubscribesConnectionEntry connectionEntry : addr2Entry.values()) { if (connectionEntry.isFreezed() || !connectionEntry.getClient().getAddr().equals(addr)) { continue; @@ -136,12 +135,12 @@ abstract class BaseLoadBalancer implements LoadBalancer { return pubSubEntries.get(); } - public Future getConnection(RedisClient client) { - SubscribesConnectionEntry entry = client2Entry.get(client); + public Future getConnection(InetSocketAddress addr) { + SubscribesConnectionEntry entry = addr2Entry.get(addr); if (entry != null) { return entries.get(entry); } - RedisConnectionException exception = new RedisConnectionException("Can't find entry for " + client.getAddr()); + RedisConnectionException exception = new RedisConnectionException("Can't find entry for " + addr); return connectionManager.getGroup().next().newFailedFuture(exception); } @@ -150,24 +149,24 @@ abstract class BaseLoadBalancer implements LoadBalancer { } public void returnSubscribeConnection(RedisPubSubConnection connection) { - SubscribesConnectionEntry entry = client2Entry.get(connection.getRedisClient()); + SubscribesConnectionEntry entry = addr2Entry.get(connection.getRedisClient().getAddr()); pubSubEntries.returnConnection(entry, connection); } public void returnConnection(RedisConnection connection) { - SubscribesConnectionEntry entry = client2Entry.get(connection.getRedisClient()); + SubscribesConnectionEntry entry = addr2Entry.get(connection.getRedisClient().getAddr()); entries.returnConnection(entry, connection); } public void shutdown() { - for (SubscribesConnectionEntry entry : client2Entry.values()) { + for (SubscribesConnectionEntry entry : addr2Entry.values()) { entry.getClient().shutdown(); } } public void shutdownAsync() { - for (RedisClient client : client2Entry.keySet()) { - connectionManager.shutdownAsync(client); + for (SubscribesConnectionEntry entry : addr2Entry.values()) { + connectionManager.shutdownAsync(entry.getClient()); } } diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index 6b9bb35de..7a823c42f 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -72,8 +72,6 @@ public interface ConnectionManager { Future connectionReadOp(NodeSource source, RedisCommand command); - Future connectionReadOp(NodeSource source, RedisCommand command, RedisClient client); - Future connectionWriteOp(NodeSource source, RedisCommand command); FutureListener createReleaseReadListener(NodeSource source, diff --git a/src/main/java/org/redisson/connection/LoadBalancer.java b/src/main/java/org/redisson/connection/LoadBalancer.java index 7643b42d9..83ba30466 100644 --- a/src/main/java/org/redisson/connection/LoadBalancer.java +++ b/src/main/java/org/redisson/connection/LoadBalancer.java @@ -15,11 +15,11 @@ */ package org.redisson.connection; +import java.net.InetSocketAddress; import java.util.Collection; import java.util.List; import org.redisson.MasterSlaveServersConfig; -import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisPubSubConnection; import org.redisson.connection.ConnectionEntry.FreezeReason; @@ -30,7 +30,7 @@ public interface LoadBalancer { SubscribesConnectionEntry getEntry(List clientsCopy); - Future getConnection(RedisClient client); + Future getConnection(InetSocketAddress addr); int getAvailableClients(); diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index d205446ac..739ee241a 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -571,33 +571,17 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } private MasterSlaveEntry getEntry(NodeSource source) { - MasterSlaveEntry e = null; - if (source.getSlot() != null) { - e = getEntry(source.getSlot()); - if (e == null) { - throw new RedisNodeNotFoundException("No node with slot: " + source.getSlot()); - } - } else { - e = getEntry(source.getAddr()); - if (e == null) { - throw new RedisNodeNotFoundException("No node with addr: " + source.getAddr()); - } + MasterSlaveEntry e = getEntry(source.getSlot()); + if (e == null) { + throw new RedisNodeNotFoundException("No node with slot: " + source.getSlot()); } return e; } private MasterSlaveEntry getEntry(NodeSource source, RedisCommand command) { - MasterSlaveEntry e = null; - if (source.getSlot() != null) { - e = getEntry(source.getSlot()); - if (e == null) { - throw new RedisNodeNotFoundException("No node for slot: " + source.getSlot() + " and command " + command); - } - } else { - e = getEntry(source.getAddr()); - if (e == null) { - throw new RedisNodeNotFoundException("No node for addr: " + source.getAddr() + " and command " + command); - } + MasterSlaveEntry e = getEntry(source.getSlot()); + if (e == null) { + throw new RedisNodeNotFoundException("No node for slot: " + source.getSlot() + " and command " + command); } return e; } @@ -605,15 +589,12 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public Future connectionReadOp(NodeSource source, RedisCommand command) { MasterSlaveEntry e = getEntry(source, command); + if (source.getAddr() != null) { + return e.connectionReadOp(source.getAddr()); + } return e.connectionReadOp(); } - @Override - public Future connectionReadOp(NodeSource source, RedisCommand command, RedisClient client) { - MasterSlaveEntry e = getEntry(source, command); - return e.connectionReadOp(client); - } - Future nextPubSubConnection(int slot) { return getEntry(slot).nextPubSubConnection(); } diff --git a/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 0409a635e..83d030b1f 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -154,8 +154,8 @@ public class MasterSlaveEntry { return slaveBalancer.nextConnection(); } - public Future connectionReadOp(RedisClient client) { - return slaveBalancer.getConnection(client); + public Future connectionReadOp(InetSocketAddress addr) { + return slaveBalancer.getConnection(addr); } diff --git a/src/main/java/org/redisson/connection/NodeSource.java b/src/main/java/org/redisson/connection/NodeSource.java index c8963f862..ee65405f9 100644 --- a/src/main/java/org/redisson/connection/NodeSource.java +++ b/src/main/java/org/redisson/connection/NodeSource.java @@ -19,20 +19,28 @@ import java.net.InetSocketAddress; public class NodeSource { + public enum Redirect {MOVED, ASK} + private final Integer slot; private final InetSocketAddress addr; + private final Redirect redirect; public NodeSource(Integer slot) { - this(slot, null); + this(slot, null, null); } - public NodeSource(InetSocketAddress addr) { - this(null, addr); + public NodeSource(Integer slot, InetSocketAddress addr) { + this(slot, addr, null); } - private NodeSource(Integer slot, InetSocketAddress addr) { + public NodeSource(Integer slot, InetSocketAddress addr, Redirect redirect) { this.slot = slot; this.addr = addr; + this.redirect = redirect; + } + + public Redirect getRedirect() { + return redirect; } public Integer getSlot() { @@ -45,10 +53,7 @@ public class NodeSource { @Override public String toString() { - if (addr != null) { - return addr.toString(); - } - return slot.toString(); + return "NodeSource [slot=" + slot + ", addr=" + addr + ", redirect=" + redirect + "]"; } } diff --git a/src/main/java/org/redisson/connection/SingleEntry.java b/src/main/java/org/redisson/connection/SingleEntry.java index 68648ff41..31fc2b257 100644 --- a/src/main/java/org/redisson/connection/SingleEntry.java +++ b/src/main/java/org/redisson/connection/SingleEntry.java @@ -15,6 +15,7 @@ */ package org.redisson.connection; +import java.net.InetSocketAddress; import java.util.Set; import org.redisson.MasterSlaveServersConfig; @@ -56,6 +57,11 @@ public class SingleEntry extends MasterSlaveEntry { pubSubConnectionHolder.returnConnection(masterEntry, entry.getConnection()); } + @Override + public Future connectionReadOp(InetSocketAddress addr) { + return super.connectionWriteOp(); + } + @Override public Future connectionReadOp() { return super.connectionWriteOp();