Redirect exceptions refactoring

pull/282/head
Nikita 9 years ago
parent 74283ca3bf
commit 14262bd25e

@ -15,25 +15,12 @@
*/
package org.redisson.client;
import java.net.InetSocketAddress;
import java.net.URI;
public class RedisAskException extends RedisException {
public class RedisAskException extends RedisRedirectException {
private static final long serialVersionUID = -6969734163155547631L;
private URI url;
public RedisAskException(String url) {
this.url = URI.create("//" + url);
}
public URI getUrl() {
return url;
}
public InetSocketAddress getAddr() {
return new InetSocketAddress(url.getHost(), url.getPort());
public RedisAskException(int slot, String url) {
super(slot, url);
}
}

@ -15,18 +15,12 @@
*/
package org.redisson.client;
public class RedisMovedException extends RedisException {
public class RedisMovedException extends RedisRedirectException {
private static final long serialVersionUID = -6969734163155547631L;
private int slot;
public RedisMovedException(int slot) {
this.slot = slot;
}
public int getSlot() {
return slot;
public RedisMovedException(int slot, String url) {
super(slot, url);
}
}

@ -0,0 +1,30 @@
package org.redisson.client;
import java.net.InetSocketAddress;
import java.net.URI;
class RedisRedirectException extends RedisException {
private static final long serialVersionUID = 181505625075250011L;
private int slot;
private URI url;
public RedisRedirectException(int slot, String url) {
this.slot = slot;
this.url = URI.create("//" + url);
}
public int getSlot() {
return slot;
}
public URI getUrl() {
return url;
}
public InetSocketAddress getAddr() {
return new InetSocketAddress(url.getHost(), url.getPort());
}
}

@ -165,10 +165,13 @@ public class CommandDecoder extends ReplayingDecoder<State> {
if (error.startsWith("MOVED")) {
String[] errorParts = error.split(" ");
int slot = Integer.valueOf(errorParts[1]);
data.getPromise().setFailure(new RedisMovedException(slot));
String addr = errorParts[2];
data.getPromise().setFailure(new RedisMovedException(slot, addr));
} else if (error.startsWith("ASK")) {
String[] errorParts = error.split(" ");
data.getPromise().setFailure(new RedisAskException(errorParts[2]));
int slot = Integer.valueOf(errorParts[1]);
String addr = errorParts[2];
data.getPromise().setFailure(new RedisAskException(slot, addr));
} else {
data.getPromise().setFailure(new RedisException(error + ". channel: " + channel + " command: " + data));
}

@ -18,7 +18,7 @@ package org.redisson.cluster;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisException;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ConnectionEntry.Mode;
import org.redisson.connection.ConnectionEntry.NodeType;
import org.redisson.connection.DefaultConnectionListener;
import org.redisson.connection.FutureConnectionListener;
@ -31,9 +31,9 @@ public class ClusterConnectionListener extends DefaultConnectionListener {
}
@Override
public void onConnect(MasterSlaveServersConfig config, Mode serverMode, FutureConnectionListener connectionListener) throws RedisException {
public void onConnect(MasterSlaveServersConfig config, NodeType serverMode, FutureConnectionListener connectionListener) throws RedisException {
super.onConnect(config, serverMode, connectionListener);
if (serverMode == Mode.SLAVE && readFromSlaves) {
if (serverMode == NodeType.SLAVE && readFromSlaves) {
connectionListener.addCommand(RedisCommands.READONLY);
}
}

@ -41,23 +41,23 @@ public class ConnectionEntry {
private FreezeReason freezeReason;
final RedisClient client;
public enum Mode {SLAVE, MASTER}
public enum NodeType {SLAVE, MASTER}
private final Mode serverMode;
private final NodeType nodeType;
private final ConnectionListener connectListener;
private final Queue<RedisConnection> connections = new ConcurrentLinkedQueue<RedisConnection>();
private final AtomicInteger connectionsCounter = new AtomicInteger();
private AtomicInteger failedAttempts = new AtomicInteger();
public ConnectionEntry(RedisClient client, int poolSize, ConnectionListener connectListener, Mode serverMode) {
public ConnectionEntry(RedisClient client, int poolSize, ConnectionListener connectListener, NodeType nodeType) {
this.client = client;
this.connectionsCounter.set(poolSize);
this.connectListener = connectListener;
this.serverMode = serverMode;
this.nodeType = nodeType;
}
public Mode getServerMode() {
return serverMode;
public NodeType getNodeType() {
return nodeType;
}
public void resetFailedAttempts() {
@ -129,7 +129,7 @@ public class ConnectionEntry {
log.debug("new connection created: {}", conn);
FutureConnectionListener<RedisConnection> listener = new FutureConnectionListener<RedisConnection>(connectionFuture, conn);
connectListener.onConnect(config, serverMode, listener);
connectListener.onConnect(config, nodeType, listener);
listener.executeCommands();
addReconnectListener(config, conn);
@ -144,7 +144,7 @@ public class ConnectionEntry {
@Override
public void onReconnect(RedisConnection conn, Promise<RedisConnection> connectionFuture) {
FutureConnectionListener<RedisConnection> listener = new FutureConnectionListener<RedisConnection>(connectionFuture, conn);
connectListener.onConnect(config, serverMode, listener);
connectListener.onConnect(config, nodeType, listener);
listener.executeCommands();
}
});
@ -163,7 +163,7 @@ public class ConnectionEntry {
log.debug("new pubsub connection created: {}", conn);
FutureConnectionListener<RedisPubSubConnection> listener = new FutureConnectionListener<RedisPubSubConnection>(connectionFuture, conn);
connectListener.onConnect(config, serverMode, listener);
connectListener.onConnect(config, nodeType, listener);
listener.executeCommands();
addReconnectListener(config, conn);

@ -17,10 +17,10 @@ package org.redisson.connection;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisException;
import org.redisson.connection.ConnectionEntry.Mode;
import org.redisson.connection.ConnectionEntry.NodeType;
public interface ConnectionListener {
void onConnect(MasterSlaveServersConfig config, Mode serverMode, FutureConnectionListener connectionListener) throws RedisException;
void onConnect(MasterSlaveServersConfig config, NodeType serverMode, FutureConnectionListener connectionListener) throws RedisException;
}

@ -18,12 +18,12 @@ package org.redisson.connection;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisException;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ConnectionEntry.Mode;
import org.redisson.connection.ConnectionEntry.NodeType;
public class DefaultConnectionListener implements ConnectionListener {
@Override
public void onConnect(MasterSlaveServersConfig config, Mode serverMode, FutureConnectionListener connectionListener)
public void onConnect(MasterSlaveServersConfig config, NodeType serverMode, FutureConnectionListener connectionListener)
throws RedisException {
if (config.getPassword() != null) {
connectionListener.addCommand(RedisCommands.AUTH, config.getPassword());

@ -27,7 +27,7 @@ import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.connection.ConnectionEntry.FreezeReason;
import org.redisson.connection.ConnectionEntry.Mode;
import org.redisson.connection.ConnectionEntry.NodeType;
import org.redisson.misc.ConnectionPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -67,9 +67,9 @@ public class MasterSlaveEntry<E extends ConnectionEntry> {
slaveBalancer.init(config, connectionManager, this);
boolean freezeMasterAsSlave = !config.getSlaveAddresses().isEmpty();
addSlave(config.getMasterAddress().getHost(), config.getMasterAddress().getPort(), freezeMasterAsSlave, Mode.MASTER);
addSlave(config.getMasterAddress().getHost(), config.getMasterAddress().getPort(), freezeMasterAsSlave, NodeType.MASTER);
for (URI address : config.getSlaveAddresses()) {
addSlave(address.getHost(), address.getPort(), false, Mode.SLAVE);
addSlave(address.getHost(), address.getPort(), false, NodeType.SLAVE);
}
writeConnectionHolder = new ConnectionPool<RedisConnection>(config, null, connectionManager, this);
@ -77,7 +77,7 @@ public class MasterSlaveEntry<E extends ConnectionEntry> {
public void setupMasterEntry(String host, int port) {
RedisClient client = connectionManager.createClient(host, port);
masterEntry = new SubscribesConnectionEntry(client, config.getMasterConnectionPoolSize(), 0, connectListener, Mode.MASTER);
masterEntry = new SubscribesConnectionEntry(client, config.getMasterConnectionPoolSize(), 0, connectListener, NodeType.MASTER);
writeConnectionHolder.add(masterEntry);
}
@ -93,10 +93,10 @@ public class MasterSlaveEntry<E extends ConnectionEntry> {
}
public void addSlave(String host, int port) {
addSlave(host, port, true, Mode.SLAVE);
addSlave(host, port, true, NodeType.SLAVE);
}
private void addSlave(String host, int port, boolean freezed, Mode mode) {
private void addSlave(String host, int port, boolean freezed, NodeType mode) {
RedisClient client = connectionManager.createClient(host, port);
SubscribesConnectionEntry entry = new SubscribesConnectionEntry(client,
this.config.getSlaveConnectionPoolSize(),

@ -22,7 +22,7 @@ import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.connection.ConnectionEntry.Mode;
import org.redisson.connection.ConnectionEntry.NodeType;
import org.redisson.misc.ConnectionPool;
import org.redisson.misc.PubSubConnectionPoll;
@ -41,7 +41,7 @@ public class SingleEntry extends MasterSlaveEntry<SubscribesConnectionEntry> {
public void setupMasterEntry(String host, int port) {
RedisClient masterClient = connectionManager.createClient(host, port);
masterEntry = new SubscribesConnectionEntry(masterClient,
config.getMasterConnectionPoolSize(), config.getSlaveSubscriptionConnectionPoolSize(), connectListener, Mode.MASTER);
config.getMasterConnectionPoolSize(), config.getSlaveSubscriptionConnectionPoolSize(), connectListener, NodeType.MASTER);
writeConnectionHolder.add(masterEntry);
pubSubConnectionHolder.add(masterEntry);
}

@ -32,7 +32,7 @@ public class SubscribesConnectionEntry extends ConnectionEntry {
private final Queue<RedisPubSubConnection> freeSubscribeConnections = new ConcurrentLinkedQueue<RedisPubSubConnection>();
private final AtomicInteger connectionsCounter = new AtomicInteger();
public SubscribesConnectionEntry(RedisClient client, int poolSize, int subscribePoolSize, ConnectionListener connectListener, Mode serverMode) {
public SubscribesConnectionEntry(RedisClient client, int poolSize, int subscribePoolSize, ConnectionListener connectListener, NodeType serverMode) {
super(client, poolSize, connectListener, serverMode);
connectionsCounter.set(subscribePoolSize);
}

@ -25,7 +25,7 @@ import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ConnectionEntry.FreezeReason;
import org.redisson.connection.ConnectionEntry.Mode;
import org.redisson.connection.ConnectionEntry.NodeType;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.LoadBalancer;
import org.redisson.connection.MasterSlaveEntry;
@ -89,7 +89,8 @@ public class ConnectionPool<T extends RedisConnection> {
}
public Future<T> get(SubscribesConnectionEntry entry) {
if (!entry.isFreezed() && tryAcquireConnection(entry)) {
if ((entry.getNodeType() == NodeType.MASTER || !entry.isFreezed())
&& tryAcquireConnection(entry)) {
Promise<T> promise = connectionManager.newPromise();
connect(entry, promise);
return promise;
@ -164,7 +165,7 @@ public class ConnectionPool<T extends RedisConnection> {
private void promiseFailure(SubscribesConnectionEntry entry, Promise<T> promise, Throwable cause) {
if (entry.incFailedAttempts() == config.getSlaveFailedAttempts()
&& entry.getServerMode() == Mode.SLAVE) {
&& entry.getNodeType() == NodeType.SLAVE) {
connectionManager.slaveDown(masterSlaveEntry, entry.getClient().getAddr().getHostName(),
entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT);
scheduleCheck(entry);

Loading…
Cancel
Save