From 68f617880a2b590977c745c2b8f1dd3929f66908 Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 3 Nov 2015 20:37:38 +0300 Subject: [PATCH] ClusterServersConfig.readFromSlaves param added. #272 --- .../org/redisson/ClusterServersConfig.java | 18 ++++++++-- .../client/protocol/RedisCommands.java | 1 + .../cluster/ClusterConnectionListener.java | 26 ++++++++++++++ .../cluster/ClusterConnectionManager.java | 5 +-- .../redisson/connection/ConnectionEntry.java | 19 +++++++---- .../connection/ConnectionListener.java | 12 +++++++ .../connection/DefaultConnectionListener.java | 25 ++++++++++++++ .../MasterSlaveConnectionManager.java | 3 +- .../redisson/connection/MasterSlaveEntry.java | 34 +++++++++---------- .../connection/SingleConnectionManager.java | 2 +- .../org/redisson/connection/SingleEntry.java | 8 +++-- .../connection/SubscribesConnectionEntry.java | 7 ++-- 12 files changed, 125 insertions(+), 35 deletions(-) create mode 100644 src/main/java/org/redisson/cluster/ClusterConnectionListener.java create mode 100644 src/main/java/org/redisson/connection/ConnectionListener.java create mode 100644 src/main/java/org/redisson/connection/DefaultConnectionListener.java diff --git a/src/main/java/org/redisson/ClusterServersConfig.java b/src/main/java/org/redisson/ClusterServersConfig.java index 41bb678cb..65e3dbdb0 100644 --- a/src/main/java/org/redisson/ClusterServersConfig.java +++ b/src/main/java/org/redisson/ClusterServersConfig.java @@ -16,7 +16,6 @@ package org.redisson; import java.net.URI; -import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; @@ -34,6 +33,8 @@ public class ClusterServersConfig extends BaseMasterSlaveServersConfig ASKING = new RedisStrictCommand("ASKING", new VoidReplayConvertor()); + RedisStrictCommand READONLY = new RedisStrictCommand("READONLY", new VoidReplayConvertor()); RedisCommand ZADD = new RedisCommand("ZADD", new BooleanAmountReplayConvertor(), 3); RedisCommand ZREM = new RedisCommand("ZREM", new BooleanAmountReplayConvertor(), 2); diff --git a/src/main/java/org/redisson/cluster/ClusterConnectionListener.java b/src/main/java/org/redisson/cluster/ClusterConnectionListener.java new file mode 100644 index 000000000..cf1731883 --- /dev/null +++ b/src/main/java/org/redisson/cluster/ClusterConnectionListener.java @@ -0,0 +1,26 @@ +package org.redisson.cluster; + +import org.redisson.MasterSlaveServersConfig; +import org.redisson.client.RedisConnection; +import org.redisson.client.RedisException; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.connection.DefaultConnectionListener; +import org.redisson.connection.ConnectionEntry.Mode; + +public class ClusterConnectionListener extends DefaultConnectionListener { + + private final boolean readFromSlaves; + + public ClusterConnectionListener(boolean readFromSlaves) { + this.readFromSlaves = readFromSlaves; + } + + @Override + public void onConnect(MasterSlaveServersConfig config, RedisConnection conn, Mode serverMode) throws RedisException { + super.onConnect(config, conn, serverMode); + if (serverMode == Mode.SLAVE && readFromSlaves) { + conn.sync(RedisCommands.READONLY); + } + } + +} diff --git a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index 1fb6b6d95..11531a6c4 100644 --- a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -36,7 +36,6 @@ import org.redisson.cluster.ClusterNodeInfo.Flag; import org.redisson.connection.CRC16; import org.redisson.connection.MasterSlaveConnectionManager; import org.redisson.connection.MasterSlaveEntry; -import org.redisson.connection.SingleEntry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +53,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { private ScheduledFuture monitorFuture; public ClusterConnectionManager(ClusterServersConfig cfg, Config config) { + connectListener = new ClusterConnectionListener(cfg.isReadFromSlaves()); init(config); this.config = create(cfg); @@ -127,9 +127,10 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { log.info("master: {} added for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges()); config.setMasterAddress(partition.getMasterAddress()); config.setSlaveAddresses(partition.getSlaveAddresses()); + log.info("slaves: {} added for slot ranges: {}", partition.getSlaveAddresses(), partition.getSlotRanges()); - SingleEntry entry = new SingleEntry(partition.getSlotRanges(), this, config); + MasterSlaveEntry entry = new MasterSlaveEntry(partition.getSlotRanges(), this, config, connectListener); entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); for (ClusterSlotRange slotRange : partition.getSlotRanges()) { addEntry(slotRange, entry); diff --git a/src/main/java/org/redisson/connection/ConnectionEntry.java b/src/main/java/org/redisson/connection/ConnectionEntry.java index 5f8312568..6fdab9035 100644 --- a/src/main/java/org/redisson/connection/ConnectionEntry.java +++ b/src/main/java/org/redisson/connection/ConnectionEntry.java @@ -38,12 +38,18 @@ public class ConnectionEntry { private volatile boolean freezed; final RedisClient client; + public enum Mode {SLAVE, MASTER} + + private final Mode serverMode; + private final ConnectionListener connectListener; private final Queue connections = new ConcurrentLinkedQueue(); private final AtomicInteger connectionsCounter = new AtomicInteger(); - public ConnectionEntry(RedisClient client, int poolSize) { + public ConnectionEntry(RedisClient client, int poolSize, ConnectionListener connectListener, Mode serverMode) { this.client = client; this.connectionsCounter.set(poolSize); + this.connectListener = connectListener; + this.serverMode = serverMode; } public RedisClient getClient() { @@ -67,7 +73,8 @@ public class ConnectionEntry { if (connectionsCounter.get() == 0) { return false; } - if (connectionsCounter.compareAndSet(connectionsCounter.get(), connectionsCounter.get() - 1)) { + int value = connectionsCounter.get(); + if (connectionsCounter.compareAndSet(value, value - 1)) { return true; } } @@ -96,11 +103,11 @@ public class ConnectionEntry { RedisConnection conn = future.getNow(); log.debug("new connection created: {}", conn); - prepareConnection(config, conn); + connectListener.onConnect(config, conn, serverMode); conn.setReconnectListener(new ReconnectListener() { @Override public void onReconnect(RedisConnection conn) { - prepareConnection(config, conn); + connectListener.onConnect(config, conn, serverMode); } }); } @@ -131,11 +138,11 @@ public class ConnectionEntry { RedisPubSubConnection conn = future.getNow(); log.debug("new pubsub connection created: {}", conn); - prepareConnection(config, conn); + connectListener.onConnect(config, conn, serverMode); conn.setReconnectListener(new ReconnectListener() { @Override public void onReconnect(RedisConnection conn) { - prepareConnection(config, conn); + connectListener.onConnect(config, conn, serverMode); } }); } diff --git a/src/main/java/org/redisson/connection/ConnectionListener.java b/src/main/java/org/redisson/connection/ConnectionListener.java new file mode 100644 index 000000000..9c331b298 --- /dev/null +++ b/src/main/java/org/redisson/connection/ConnectionListener.java @@ -0,0 +1,12 @@ +package org.redisson.connection; + +import org.redisson.MasterSlaveServersConfig; +import org.redisson.client.RedisConnection; +import org.redisson.client.RedisException; +import org.redisson.connection.ConnectionEntry.Mode; + +public interface ConnectionListener { + + void onConnect(MasterSlaveServersConfig config, RedisConnection redisConnection, Mode serverMode) throws RedisException; + +} diff --git a/src/main/java/org/redisson/connection/DefaultConnectionListener.java b/src/main/java/org/redisson/connection/DefaultConnectionListener.java new file mode 100644 index 000000000..869bec235 --- /dev/null +++ b/src/main/java/org/redisson/connection/DefaultConnectionListener.java @@ -0,0 +1,25 @@ +package org.redisson.connection; + +import org.redisson.MasterSlaveServersConfig; +import org.redisson.client.RedisConnection; +import org.redisson.client.RedisException; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.connection.ConnectionEntry.Mode; + +public class DefaultConnectionListener implements ConnectionListener { + + @Override + public void onConnect(MasterSlaveServersConfig config, RedisConnection conn, Mode serverMode) + throws RedisException { + if (config.getPassword() != null) { + conn.sync(RedisCommands.AUTH, config.getPassword()); + } + if (config.getDatabase() != 0) { + conn.sync(RedisCommands.SELECT, config.getDatabase()); + } + if (config.getClientName() != null) { + conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName()); + } + } + +} diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index d381eee1a..ab76cf71d 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -74,6 +74,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { protected EventLoopGroup group; + protected ConnectionListener connectListener = new DefaultConnectionListener(); protected Class socketChannelClass; @@ -135,7 +136,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { protected void initEntry(MasterSlaveServersConfig config) { HashSet slots = new HashSet(); slots.add(singleSlotRange); - MasterSlaveEntry entry = new MasterSlaveEntry(slots, this, config); + MasterSlaveEntry entry = new MasterSlaveEntry(slots, this, config, connectListener); entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); addEntry(singleSlotRange, entry); } diff --git a/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 64562a850..4ff8abb9a 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -17,9 +17,7 @@ package org.redisson.connection; import java.net.InetSocketAddress; import java.net.URI; -import java.util.ArrayList; import java.util.Collection; -import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -28,6 +26,7 @@ import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisPubSubConnection; import org.redisson.cluster.ClusterSlotRange; +import org.redisson.connection.ConnectionEntry.Mode; import org.redisson.misc.ConnectionPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +46,8 @@ public class MasterSlaveEntry { LoadBalancer slaveBalancer; SubscribesConnectionEntry masterEntry; + final ConnectionListener connectListener; + final MasterSlaveServersConfig config; final ConnectionManager connectionManager; @@ -55,32 +56,27 @@ public class MasterSlaveEntry { final AtomicBoolean active = new AtomicBoolean(true); - public MasterSlaveEntry(Set slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config) { + public MasterSlaveEntry(Set slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config, ConnectionListener connectListener) { this.slotRanges = slotRanges; this.connectionManager = connectionManager; this.config = config; + this.connectListener = connectListener; slaveBalancer = config.getLoadBalancer(); slaveBalancer.init(config, connectionManager); - List addresses = new ArrayList(config.getSlaveAddresses()); - addresses.add(config.getMasterAddress()); - for (URI address : addresses) { - RedisClient client = connectionManager.createClient(address.getHost(), address.getPort()); - slaveBalancer.add(new SubscribesConnectionEntry(client, - this.config.getSlaveConnectionPoolSize(), - this.config.getSlaveSubscriptionConnectionPoolSize())); - } - if (!config.getSlaveAddresses().isEmpty()) { - slaveDown(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); + boolean freezeMasterAsSlave = !config.getSlaveAddresses().isEmpty(); + addSlave(config.getMasterAddress().getHost(), config.getMasterAddress().getPort(), freezeMasterAsSlave, Mode.MASTER); + for (URI address : config.getSlaveAddresses()) { + addSlave(address.getHost(), address.getPort(), false, Mode.SLAVE); } writeConnectionHolder = new ConnectionPool(config, null, connectionManager.getGroup()); } - protected void setupMasterEntry(String host, int port) { + public void setupMasterEntry(String host, int port) { RedisClient client = connectionManager.createClient(host, port); - masterEntry = new SubscribesConnectionEntry(client, config.getMasterConnectionPoolSize(), 0); + masterEntry = new SubscribesConnectionEntry(client, config.getMasterConnectionPoolSize(), 0, connectListener, Mode.MASTER); writeConnectionHolder.add(masterEntry); } @@ -95,11 +91,15 @@ public class MasterSlaveEntry { } public void addSlave(String host, int port) { + addSlave(host, port, true, Mode.SLAVE); + } + + private void addSlave(String host, int port, boolean freezed, Mode mode) { RedisClient client = connectionManager.createClient(host, port); SubscribesConnectionEntry entry = new SubscribesConnectionEntry(client, this.config.getSlaveConnectionPoolSize(), - this.config.getSlaveSubscriptionConnectionPoolSize()); - entry.setFreezed(true); + this.config.getSlaveSubscriptionConnectionPoolSize(), connectListener, mode); + entry.setFreezed(freezed); slaveBalancer.add(entry); } diff --git a/src/main/java/org/redisson/connection/SingleConnectionManager.java b/src/main/java/org/redisson/connection/SingleConnectionManager.java index 26f995a3b..57a95fd06 100644 --- a/src/main/java/org/redisson/connection/SingleConnectionManager.java +++ b/src/main/java/org/redisson/connection/SingleConnectionManager.java @@ -74,7 +74,7 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager { protected void initEntry(MasterSlaveServersConfig config) { HashSet slots = new HashSet(); slots.add(singleSlotRange); - SingleEntry entry = new SingleEntry(slots, this, config); + SingleEntry entry = new SingleEntry(slots, this, config, connectListener); entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); addEntry(singleSlotRange, entry); } diff --git a/src/main/java/org/redisson/connection/SingleEntry.java b/src/main/java/org/redisson/connection/SingleEntry.java index b5095f6f3..d3f9f39f6 100644 --- a/src/main/java/org/redisson/connection/SingleEntry.java +++ b/src/main/java/org/redisson/connection/SingleEntry.java @@ -22,6 +22,7 @@ import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisPubSubConnection; import org.redisson.cluster.ClusterSlotRange; +import org.redisson.connection.ConnectionEntry.Mode; import org.redisson.misc.ConnectionPool; import org.redisson.misc.PubSubConnectionPoll; @@ -31,15 +32,16 @@ public class SingleEntry extends MasterSlaveEntry { final ConnectionPool pubSubConnectionHolder; - public SingleEntry(Set slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config) { - super(slotRanges, connectionManager, config); + public SingleEntry(Set slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config, ConnectionListener connectListener) { + super(slotRanges, connectionManager, config, connectListener); pubSubConnectionHolder = new PubSubConnectionPoll(config, null, connectionManager.getGroup()); } @Override public void setupMasterEntry(String host, int port) { RedisClient masterClient = connectionManager.createClient(host, port); - masterEntry = new SubscribesConnectionEntry(masterClient, config.getMasterConnectionPoolSize(), config.getSlaveSubscriptionConnectionPoolSize()); + masterEntry = new SubscribesConnectionEntry(masterClient, + config.getMasterConnectionPoolSize(), config.getSlaveSubscriptionConnectionPoolSize(), connectListener, Mode.MASTER); writeConnectionHolder.add(masterEntry); pubSubConnectionHolder.add(masterEntry); } diff --git a/src/main/java/org/redisson/connection/SubscribesConnectionEntry.java b/src/main/java/org/redisson/connection/SubscribesConnectionEntry.java index 4a2c88982..de3e043c2 100644 --- a/src/main/java/org/redisson/connection/SubscribesConnectionEntry.java +++ b/src/main/java/org/redisson/connection/SubscribesConnectionEntry.java @@ -32,8 +32,8 @@ public class SubscribesConnectionEntry extends ConnectionEntry { private final Queue freeSubscribeConnections = new ConcurrentLinkedQueue(); private final AtomicInteger connectionsCounter = new AtomicInteger(); - public SubscribesConnectionEntry(RedisClient client, int poolSize, int subscribePoolSize) { - super(client, poolSize); + public SubscribesConnectionEntry(RedisClient client, int poolSize, int subscribePoolSize, ConnectionListener connectListener, Mode serverMode) { + super(client, poolSize, connectListener, serverMode); connectionsCounter.set(subscribePoolSize); } @@ -58,7 +58,8 @@ public class SubscribesConnectionEntry extends ConnectionEntry { if (connectionsCounter.get() == 0) { return false; } - if (connectionsCounter.compareAndSet(connectionsCounter.get(), connectionsCounter.get() - 1)) { + int value = connectionsCounter.get(); + if (connectionsCounter.compareAndSet(value, value - 1)) { return true; } }