From 14262bd25ea1739d2a7f3788c8fd67f38ec92182 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 9 Nov 2015 20:48:10 +0300 Subject: [PATCH] Redirect exceptions refactoring --- .../redisson/client/RedisAskException.java | 19 ++---------- .../redisson/client/RedisMovedException.java | 12 ++------ .../client/RedisRedirectException.java | 30 +++++++++++++++++++ .../client/handler/CommandDecoder.java | 7 +++-- .../cluster/ClusterConnectionListener.java | 6 ++-- .../redisson/connection/ConnectionEntry.java | 18 +++++------ .../connection/ConnectionListener.java | 4 +-- .../connection/DefaultConnectionListener.java | 4 +-- .../redisson/connection/MasterSlaveEntry.java | 12 ++++---- .../org/redisson/connection/SingleEntry.java | 4 +-- .../connection/SubscribesConnectionEntry.java | 2 +- .../org/redisson/misc/ConnectionPool.java | 7 +++-- 12 files changed, 70 insertions(+), 55 deletions(-) create mode 100644 src/main/java/org/redisson/client/RedisRedirectException.java diff --git a/src/main/java/org/redisson/client/RedisAskException.java b/src/main/java/org/redisson/client/RedisAskException.java index e85e6e1bb..3cdf2fa44 100644 --- a/src/main/java/org/redisson/client/RedisAskException.java +++ b/src/main/java/org/redisson/client/RedisAskException.java @@ -15,25 +15,12 @@ */ package org.redisson.client; -import java.net.InetSocketAddress; -import java.net.URI; - -public class RedisAskException extends RedisException { +public class RedisAskException extends RedisRedirectException { private static final long serialVersionUID = -6969734163155547631L; - private URI url; - - public RedisAskException(String url) { - this.url = URI.create("//" + url); - } - - public URI getUrl() { - return url; - } - - public InetSocketAddress getAddr() { - return new InetSocketAddress(url.getHost(), url.getPort()); + public RedisAskException(int slot, String url) { + super(slot, url); } } diff --git a/src/main/java/org/redisson/client/RedisMovedException.java b/src/main/java/org/redisson/client/RedisMovedException.java index 616ba4686..1581e931d 100644 --- a/src/main/java/org/redisson/client/RedisMovedException.java +++ b/src/main/java/org/redisson/client/RedisMovedException.java @@ -15,18 +15,12 @@ */ package org.redisson.client; -public class RedisMovedException extends RedisException { +public class RedisMovedException extends RedisRedirectException { private static final long serialVersionUID = -6969734163155547631L; - private int slot; - - public RedisMovedException(int slot) { - this.slot = slot; - } - - public int getSlot() { - return slot; + public RedisMovedException(int slot, String url) { + super(slot, url); } } diff --git a/src/main/java/org/redisson/client/RedisRedirectException.java b/src/main/java/org/redisson/client/RedisRedirectException.java new file mode 100644 index 000000000..e9413e6ed --- /dev/null +++ b/src/main/java/org/redisson/client/RedisRedirectException.java @@ -0,0 +1,30 @@ +package org.redisson.client; + +import java.net.InetSocketAddress; +import java.net.URI; + +class RedisRedirectException extends RedisException { + + private static final long serialVersionUID = 181505625075250011L; + + private int slot; + private URI url; + + public RedisRedirectException(int slot, String url) { + this.slot = slot; + this.url = URI.create("//" + url); + } + + public int getSlot() { + return slot; + } + + public URI getUrl() { + return url; + } + + public InetSocketAddress getAddr() { + return new InetSocketAddress(url.getHost(), url.getPort()); + } + +} diff --git a/src/main/java/org/redisson/client/handler/CommandDecoder.java b/src/main/java/org/redisson/client/handler/CommandDecoder.java index 68e238f08..b5029ffe6 100644 --- a/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -165,10 +165,13 @@ public class CommandDecoder extends ReplayingDecoder { if (error.startsWith("MOVED")) { String[] errorParts = error.split(" "); int slot = Integer.valueOf(errorParts[1]); - data.getPromise().setFailure(new RedisMovedException(slot)); + String addr = errorParts[2]; + data.getPromise().setFailure(new RedisMovedException(slot, addr)); } else if (error.startsWith("ASK")) { String[] errorParts = error.split(" "); - data.getPromise().setFailure(new RedisAskException(errorParts[2])); + int slot = Integer.valueOf(errorParts[1]); + String addr = errorParts[2]; + data.getPromise().setFailure(new RedisAskException(slot, addr)); } else { data.getPromise().setFailure(new RedisException(error + ". channel: " + channel + " command: " + data)); } diff --git a/src/main/java/org/redisson/cluster/ClusterConnectionListener.java b/src/main/java/org/redisson/cluster/ClusterConnectionListener.java index 5b5a44901..7ceeb67a4 100644 --- a/src/main/java/org/redisson/cluster/ClusterConnectionListener.java +++ b/src/main/java/org/redisson/cluster/ClusterConnectionListener.java @@ -18,7 +18,7 @@ package org.redisson.cluster; import org.redisson.MasterSlaveServersConfig; import org.redisson.client.RedisException; import org.redisson.client.protocol.RedisCommands; -import org.redisson.connection.ConnectionEntry.Mode; +import org.redisson.connection.ConnectionEntry.NodeType; import org.redisson.connection.DefaultConnectionListener; import org.redisson.connection.FutureConnectionListener; @@ -31,9 +31,9 @@ public class ClusterConnectionListener extends DefaultConnectionListener { } @Override - public void onConnect(MasterSlaveServersConfig config, Mode serverMode, FutureConnectionListener connectionListener) throws RedisException { + public void onConnect(MasterSlaveServersConfig config, NodeType serverMode, FutureConnectionListener connectionListener) throws RedisException { super.onConnect(config, serverMode, connectionListener); - if (serverMode == Mode.SLAVE && readFromSlaves) { + if (serverMode == NodeType.SLAVE && readFromSlaves) { connectionListener.addCommand(RedisCommands.READONLY); } } diff --git a/src/main/java/org/redisson/connection/ConnectionEntry.java b/src/main/java/org/redisson/connection/ConnectionEntry.java index d09af2bae..71c877ad0 100644 --- a/src/main/java/org/redisson/connection/ConnectionEntry.java +++ b/src/main/java/org/redisson/connection/ConnectionEntry.java @@ -41,23 +41,23 @@ public class ConnectionEntry { private FreezeReason freezeReason; final RedisClient client; - public enum Mode {SLAVE, MASTER} + public enum NodeType {SLAVE, MASTER} - private final Mode serverMode; + private final NodeType nodeType; private final ConnectionListener connectListener; private final Queue connections = new ConcurrentLinkedQueue(); private final AtomicInteger connectionsCounter = new AtomicInteger(); private AtomicInteger failedAttempts = new AtomicInteger(); - public ConnectionEntry(RedisClient client, int poolSize, ConnectionListener connectListener, Mode serverMode) { + public ConnectionEntry(RedisClient client, int poolSize, ConnectionListener connectListener, NodeType nodeType) { this.client = client; this.connectionsCounter.set(poolSize); this.connectListener = connectListener; - this.serverMode = serverMode; + this.nodeType = nodeType; } - public Mode getServerMode() { - return serverMode; + public NodeType getNodeType() { + return nodeType; } public void resetFailedAttempts() { @@ -129,7 +129,7 @@ public class ConnectionEntry { log.debug("new connection created: {}", conn); FutureConnectionListener listener = new FutureConnectionListener(connectionFuture, conn); - connectListener.onConnect(config, serverMode, listener); + connectListener.onConnect(config, nodeType, listener); listener.executeCommands(); addReconnectListener(config, conn); @@ -144,7 +144,7 @@ public class ConnectionEntry { @Override public void onReconnect(RedisConnection conn, Promise connectionFuture) { FutureConnectionListener listener = new FutureConnectionListener(connectionFuture, conn); - connectListener.onConnect(config, serverMode, listener); + connectListener.onConnect(config, nodeType, listener); listener.executeCommands(); } }); @@ -163,7 +163,7 @@ public class ConnectionEntry { log.debug("new pubsub connection created: {}", conn); FutureConnectionListener listener = new FutureConnectionListener(connectionFuture, conn); - connectListener.onConnect(config, serverMode, listener); + connectListener.onConnect(config, nodeType, listener); listener.executeCommands(); addReconnectListener(config, conn); diff --git a/src/main/java/org/redisson/connection/ConnectionListener.java b/src/main/java/org/redisson/connection/ConnectionListener.java index e9a9a5f7f..697e4d360 100644 --- a/src/main/java/org/redisson/connection/ConnectionListener.java +++ b/src/main/java/org/redisson/connection/ConnectionListener.java @@ -17,10 +17,10 @@ package org.redisson.connection; import org.redisson.MasterSlaveServersConfig; import org.redisson.client.RedisException; -import org.redisson.connection.ConnectionEntry.Mode; +import org.redisson.connection.ConnectionEntry.NodeType; public interface ConnectionListener { - void onConnect(MasterSlaveServersConfig config, Mode serverMode, FutureConnectionListener connectionListener) throws RedisException; + void onConnect(MasterSlaveServersConfig config, NodeType serverMode, FutureConnectionListener connectionListener) throws RedisException; } diff --git a/src/main/java/org/redisson/connection/DefaultConnectionListener.java b/src/main/java/org/redisson/connection/DefaultConnectionListener.java index 47577d3f0..2f851a079 100644 --- a/src/main/java/org/redisson/connection/DefaultConnectionListener.java +++ b/src/main/java/org/redisson/connection/DefaultConnectionListener.java @@ -18,12 +18,12 @@ package org.redisson.connection; import org.redisson.MasterSlaveServersConfig; import org.redisson.client.RedisException; import org.redisson.client.protocol.RedisCommands; -import org.redisson.connection.ConnectionEntry.Mode; +import org.redisson.connection.ConnectionEntry.NodeType; public class DefaultConnectionListener implements ConnectionListener { @Override - public void onConnect(MasterSlaveServersConfig config, Mode serverMode, FutureConnectionListener connectionListener) + public void onConnect(MasterSlaveServersConfig config, NodeType serverMode, FutureConnectionListener connectionListener) throws RedisException { if (config.getPassword() != null) { connectionListener.addCommand(RedisCommands.AUTH, config.getPassword()); diff --git a/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 5dd0faa82..0409a635e 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -27,7 +27,7 @@ import org.redisson.client.RedisConnection; import org.redisson.client.RedisPubSubConnection; import org.redisson.cluster.ClusterSlotRange; import org.redisson.connection.ConnectionEntry.FreezeReason; -import org.redisson.connection.ConnectionEntry.Mode; +import org.redisson.connection.ConnectionEntry.NodeType; import org.redisson.misc.ConnectionPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,9 +67,9 @@ public class MasterSlaveEntry { slaveBalancer.init(config, connectionManager, this); boolean freezeMasterAsSlave = !config.getSlaveAddresses().isEmpty(); - addSlave(config.getMasterAddress().getHost(), config.getMasterAddress().getPort(), freezeMasterAsSlave, Mode.MASTER); + addSlave(config.getMasterAddress().getHost(), config.getMasterAddress().getPort(), freezeMasterAsSlave, NodeType.MASTER); for (URI address : config.getSlaveAddresses()) { - addSlave(address.getHost(), address.getPort(), false, Mode.SLAVE); + addSlave(address.getHost(), address.getPort(), false, NodeType.SLAVE); } writeConnectionHolder = new ConnectionPool(config, null, connectionManager, this); @@ -77,7 +77,7 @@ public class MasterSlaveEntry { public void setupMasterEntry(String host, int port) { RedisClient client = connectionManager.createClient(host, port); - masterEntry = new SubscribesConnectionEntry(client, config.getMasterConnectionPoolSize(), 0, connectListener, Mode.MASTER); + masterEntry = new SubscribesConnectionEntry(client, config.getMasterConnectionPoolSize(), 0, connectListener, NodeType.MASTER); writeConnectionHolder.add(masterEntry); } @@ -93,10 +93,10 @@ public class MasterSlaveEntry { } public void addSlave(String host, int port) { - addSlave(host, port, true, Mode.SLAVE); + addSlave(host, port, true, NodeType.SLAVE); } - private void addSlave(String host, int port, boolean freezed, Mode mode) { + private void addSlave(String host, int port, boolean freezed, NodeType mode) { RedisClient client = connectionManager.createClient(host, port); SubscribesConnectionEntry entry = new SubscribesConnectionEntry(client, this.config.getSlaveConnectionPoolSize(), diff --git a/src/main/java/org/redisson/connection/SingleEntry.java b/src/main/java/org/redisson/connection/SingleEntry.java index 5f4e04494..68648ff41 100644 --- a/src/main/java/org/redisson/connection/SingleEntry.java +++ b/src/main/java/org/redisson/connection/SingleEntry.java @@ -22,7 +22,7 @@ import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisPubSubConnection; import org.redisson.cluster.ClusterSlotRange; -import org.redisson.connection.ConnectionEntry.Mode; +import org.redisson.connection.ConnectionEntry.NodeType; import org.redisson.misc.ConnectionPool; import org.redisson.misc.PubSubConnectionPoll; @@ -41,7 +41,7 @@ public class SingleEntry extends MasterSlaveEntry { public void setupMasterEntry(String host, int port) { RedisClient masterClient = connectionManager.createClient(host, port); masterEntry = new SubscribesConnectionEntry(masterClient, - config.getMasterConnectionPoolSize(), config.getSlaveSubscriptionConnectionPoolSize(), connectListener, Mode.MASTER); + config.getMasterConnectionPoolSize(), config.getSlaveSubscriptionConnectionPoolSize(), connectListener, NodeType.MASTER); writeConnectionHolder.add(masterEntry); pubSubConnectionHolder.add(masterEntry); } diff --git a/src/main/java/org/redisson/connection/SubscribesConnectionEntry.java b/src/main/java/org/redisson/connection/SubscribesConnectionEntry.java index de3e043c2..136ad4638 100644 --- a/src/main/java/org/redisson/connection/SubscribesConnectionEntry.java +++ b/src/main/java/org/redisson/connection/SubscribesConnectionEntry.java @@ -32,7 +32,7 @@ public class SubscribesConnectionEntry extends ConnectionEntry { private final Queue freeSubscribeConnections = new ConcurrentLinkedQueue(); private final AtomicInteger connectionsCounter = new AtomicInteger(); - public SubscribesConnectionEntry(RedisClient client, int poolSize, int subscribePoolSize, ConnectionListener connectListener, Mode serverMode) { + public SubscribesConnectionEntry(RedisClient client, int poolSize, int subscribePoolSize, ConnectionListener connectListener, NodeType serverMode) { super(client, poolSize, connectListener, serverMode); connectionsCounter.set(subscribePoolSize); } diff --git a/src/main/java/org/redisson/misc/ConnectionPool.java b/src/main/java/org/redisson/misc/ConnectionPool.java index af7b2eaaa..1f8084996 100644 --- a/src/main/java/org/redisson/misc/ConnectionPool.java +++ b/src/main/java/org/redisson/misc/ConnectionPool.java @@ -25,7 +25,7 @@ import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnectionException; import org.redisson.client.protocol.RedisCommands; import org.redisson.connection.ConnectionEntry.FreezeReason; -import org.redisson.connection.ConnectionEntry.Mode; +import org.redisson.connection.ConnectionEntry.NodeType; import org.redisson.connection.ConnectionManager; import org.redisson.connection.LoadBalancer; import org.redisson.connection.MasterSlaveEntry; @@ -89,7 +89,8 @@ public class ConnectionPool { } public Future get(SubscribesConnectionEntry entry) { - if (!entry.isFreezed() && tryAcquireConnection(entry)) { + if ((entry.getNodeType() == NodeType.MASTER || !entry.isFreezed()) + && tryAcquireConnection(entry)) { Promise promise = connectionManager.newPromise(); connect(entry, promise); return promise; @@ -164,7 +165,7 @@ public class ConnectionPool { private void promiseFailure(SubscribesConnectionEntry entry, Promise promise, Throwable cause) { if (entry.incFailedAttempts() == config.getSlaveFailedAttempts() - && entry.getServerMode() == Mode.SLAVE) { + && entry.getNodeType() == NodeType.SLAVE) { connectionManager.slaveDown(masterSlaveEntry, entry.getClient().getAddr().getHostName(), entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT); scheduleCheck(entry);