ClusterServersConfig.readFromSlaves param added. #272

pull/282/head
Nikita 9 years ago
parent 7d940c012a
commit 68f617880a

@ -16,7 +16,6 @@
package org.redisson; package org.redisson;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -34,6 +33,8 @@ public class ClusterServersConfig extends BaseMasterSlaveServersConfig<ClusterSe
*/ */
private int scanInterval = 1000; private int scanInterval = 1000;
private boolean readFromSlaves = true;
public ClusterServersConfig() { public ClusterServersConfig() {
} }
@ -41,6 +42,7 @@ public class ClusterServersConfig extends BaseMasterSlaveServersConfig<ClusterSe
super(config); super(config);
setNodeAddresses(config.getNodeAddresses()); setNodeAddresses(config.getNodeAddresses());
setScanInterval(config.getScanInterval()); setScanInterval(config.getScanInterval());
setReadFromSlaves(config.isReadFromSlaves());
} }
/** /**
@ -76,6 +78,18 @@ public class ClusterServersConfig extends BaseMasterSlaveServersConfig<ClusterSe
return this; return this;
} }
public boolean isReadFromSlaves() {
return readFromSlaves;
}
/**
* Use cluster slaves for read-operations
*
* @param readFromSlaves
* @return
*/
public ClusterServersConfig setReadFromSlaves(boolean readFromSlaves) {
this.readFromSlaves = readFromSlaves;
return this;
}
} }

@ -50,6 +50,7 @@ import org.redisson.client.protocol.pubsub.PubSubStatusDecoder;
public interface RedisCommands { public interface RedisCommands {
RedisStrictCommand<Void> ASKING = new RedisStrictCommand<Void>("ASKING", new VoidReplayConvertor()); RedisStrictCommand<Void> ASKING = new RedisStrictCommand<Void>("ASKING", new VoidReplayConvertor());
RedisStrictCommand<Void> READONLY = new RedisStrictCommand<Void>("READONLY", new VoidReplayConvertor());
RedisCommand<Boolean> ZADD = new RedisCommand<Boolean>("ZADD", new BooleanAmountReplayConvertor(), 3); RedisCommand<Boolean> ZADD = new RedisCommand<Boolean>("ZADD", new BooleanAmountReplayConvertor(), 3);
RedisCommand<Boolean> ZREM = new RedisCommand<Boolean>("ZREM", new BooleanAmountReplayConvertor(), 2); RedisCommand<Boolean> ZREM = new RedisCommand<Boolean>("ZREM", new BooleanAmountReplayConvertor(), 2);

@ -0,0 +1,26 @@
package org.redisson.cluster;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.DefaultConnectionListener;
import org.redisson.connection.ConnectionEntry.Mode;
public class ClusterConnectionListener extends DefaultConnectionListener {
private final boolean readFromSlaves;
public ClusterConnectionListener(boolean readFromSlaves) {
this.readFromSlaves = readFromSlaves;
}
@Override
public void onConnect(MasterSlaveServersConfig config, RedisConnection conn, Mode serverMode) throws RedisException {
super.onConnect(config, conn, serverMode);
if (serverMode == Mode.SLAVE && readFromSlaves) {
conn.sync(RedisCommands.READONLY);
}
}
}

@ -36,7 +36,6 @@ import org.redisson.cluster.ClusterNodeInfo.Flag;
import org.redisson.connection.CRC16; import org.redisson.connection.CRC16;
import org.redisson.connection.MasterSlaveConnectionManager; import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.SingleEntry;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -54,6 +53,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private ScheduledFuture<?> monitorFuture; private ScheduledFuture<?> monitorFuture;
public ClusterConnectionManager(ClusterServersConfig cfg, Config config) { public ClusterConnectionManager(ClusterServersConfig cfg, Config config) {
connectListener = new ClusterConnectionListener(cfg.isReadFromSlaves());
init(config); init(config);
this.config = create(cfg); this.config = create(cfg);
@ -127,9 +127,10 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
log.info("master: {} added for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges()); log.info("master: {} added for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges());
config.setMasterAddress(partition.getMasterAddress()); config.setMasterAddress(partition.getMasterAddress());
config.setSlaveAddresses(partition.getSlaveAddresses()); config.setSlaveAddresses(partition.getSlaveAddresses());
log.info("slaves: {} added for slot ranges: {}", partition.getSlaveAddresses(), partition.getSlotRanges()); log.info("slaves: {} added for slot ranges: {}", partition.getSlaveAddresses(), partition.getSlotRanges());
SingleEntry entry = new SingleEntry(partition.getSlotRanges(), this, config); MasterSlaveEntry entry = new MasterSlaveEntry(partition.getSlotRanges(), this, config, connectListener);
entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
for (ClusterSlotRange slotRange : partition.getSlotRanges()) { for (ClusterSlotRange slotRange : partition.getSlotRanges()) {
addEntry(slotRange, entry); addEntry(slotRange, entry);

@ -38,12 +38,18 @@ public class ConnectionEntry {
private volatile boolean freezed; private volatile boolean freezed;
final RedisClient client; final RedisClient client;
public enum Mode {SLAVE, MASTER}
private final Mode serverMode;
private final ConnectionListener connectListener;
private final Queue<RedisConnection> connections = new ConcurrentLinkedQueue<RedisConnection>(); private final Queue<RedisConnection> connections = new ConcurrentLinkedQueue<RedisConnection>();
private final AtomicInteger connectionsCounter = new AtomicInteger(); private final AtomicInteger connectionsCounter = new AtomicInteger();
public ConnectionEntry(RedisClient client, int poolSize) { public ConnectionEntry(RedisClient client, int poolSize, ConnectionListener connectListener, Mode serverMode) {
this.client = client; this.client = client;
this.connectionsCounter.set(poolSize); this.connectionsCounter.set(poolSize);
this.connectListener = connectListener;
this.serverMode = serverMode;
} }
public RedisClient getClient() { public RedisClient getClient() {
@ -67,7 +73,8 @@ public class ConnectionEntry {
if (connectionsCounter.get() == 0) { if (connectionsCounter.get() == 0) {
return false; return false;
} }
if (connectionsCounter.compareAndSet(connectionsCounter.get(), connectionsCounter.get() - 1)) { int value = connectionsCounter.get();
if (connectionsCounter.compareAndSet(value, value - 1)) {
return true; return true;
} }
} }
@ -96,11 +103,11 @@ public class ConnectionEntry {
RedisConnection conn = future.getNow(); RedisConnection conn = future.getNow();
log.debug("new connection created: {}", conn); log.debug("new connection created: {}", conn);
prepareConnection(config, conn); connectListener.onConnect(config, conn, serverMode);
conn.setReconnectListener(new ReconnectListener() { conn.setReconnectListener(new ReconnectListener() {
@Override @Override
public void onReconnect(RedisConnection conn) { public void onReconnect(RedisConnection conn) {
prepareConnection(config, conn); connectListener.onConnect(config, conn, serverMode);
} }
}); });
} }
@ -131,11 +138,11 @@ public class ConnectionEntry {
RedisPubSubConnection conn = future.getNow(); RedisPubSubConnection conn = future.getNow();
log.debug("new pubsub connection created: {}", conn); log.debug("new pubsub connection created: {}", conn);
prepareConnection(config, conn); connectListener.onConnect(config, conn, serverMode);
conn.setReconnectListener(new ReconnectListener() { conn.setReconnectListener(new ReconnectListener() {
@Override @Override
public void onReconnect(RedisConnection conn) { public void onReconnect(RedisConnection conn) {
prepareConnection(config, conn); connectListener.onConnect(config, conn, serverMode);
} }
}); });
} }

@ -0,0 +1,12 @@
package org.redisson.connection;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
import org.redisson.connection.ConnectionEntry.Mode;
public interface ConnectionListener {
void onConnect(MasterSlaveServersConfig config, RedisConnection redisConnection, Mode serverMode) throws RedisException;
}

@ -0,0 +1,25 @@
package org.redisson.connection;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ConnectionEntry.Mode;
public class DefaultConnectionListener implements ConnectionListener {
@Override
public void onConnect(MasterSlaveServersConfig config, RedisConnection conn, Mode serverMode)
throws RedisException {
if (config.getPassword() != null) {
conn.sync(RedisCommands.AUTH, config.getPassword());
}
if (config.getDatabase() != 0) {
conn.sync(RedisCommands.SELECT, config.getDatabase());
}
if (config.getClientName() != null) {
conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName());
}
}
}

@ -74,6 +74,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected EventLoopGroup group; protected EventLoopGroup group;
protected ConnectionListener connectListener = new DefaultConnectionListener();
protected Class<? extends SocketChannel> socketChannelClass; protected Class<? extends SocketChannel> socketChannelClass;
@ -135,7 +136,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected void initEntry(MasterSlaveServersConfig config) { protected void initEntry(MasterSlaveServersConfig config) {
HashSet<ClusterSlotRange> slots = new HashSet<ClusterSlotRange>(); HashSet<ClusterSlotRange> slots = new HashSet<ClusterSlotRange>();
slots.add(singleSlotRange); slots.add(singleSlotRange);
MasterSlaveEntry entry = new MasterSlaveEntry(slots, this, config); MasterSlaveEntry entry = new MasterSlaveEntry(slots, this, config, connectListener);
entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
addEntry(singleSlotRange, entry); addEntry(singleSlotRange, entry);
} }

@ -17,9 +17,7 @@ package org.redisson.connection;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -28,6 +26,7 @@ import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
import org.redisson.cluster.ClusterSlotRange; import org.redisson.cluster.ClusterSlotRange;
import org.redisson.connection.ConnectionEntry.Mode;
import org.redisson.misc.ConnectionPool; import org.redisson.misc.ConnectionPool;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -47,6 +46,8 @@ public class MasterSlaveEntry<E extends ConnectionEntry> {
LoadBalancer slaveBalancer; LoadBalancer slaveBalancer;
SubscribesConnectionEntry masterEntry; SubscribesConnectionEntry masterEntry;
final ConnectionListener connectListener;
final MasterSlaveServersConfig config; final MasterSlaveServersConfig config;
final ConnectionManager connectionManager; final ConnectionManager connectionManager;
@ -55,32 +56,27 @@ public class MasterSlaveEntry<E extends ConnectionEntry> {
final AtomicBoolean active = new AtomicBoolean(true); final AtomicBoolean active = new AtomicBoolean(true);
public MasterSlaveEntry(Set<ClusterSlotRange> slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config) { public MasterSlaveEntry(Set<ClusterSlotRange> slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config, ConnectionListener connectListener) {
this.slotRanges = slotRanges; this.slotRanges = slotRanges;
this.connectionManager = connectionManager; this.connectionManager = connectionManager;
this.config = config; this.config = config;
this.connectListener = connectListener;
slaveBalancer = config.getLoadBalancer(); slaveBalancer = config.getLoadBalancer();
slaveBalancer.init(config, connectionManager); slaveBalancer.init(config, connectionManager);
List<URI> addresses = new ArrayList<URI>(config.getSlaveAddresses()); boolean freezeMasterAsSlave = !config.getSlaveAddresses().isEmpty();
addresses.add(config.getMasterAddress()); addSlave(config.getMasterAddress().getHost(), config.getMasterAddress().getPort(), freezeMasterAsSlave, Mode.MASTER);
for (URI address : addresses) { for (URI address : config.getSlaveAddresses()) {
RedisClient client = connectionManager.createClient(address.getHost(), address.getPort()); addSlave(address.getHost(), address.getPort(), false, Mode.SLAVE);
slaveBalancer.add(new SubscribesConnectionEntry(client,
this.config.getSlaveConnectionPoolSize(),
this.config.getSlaveSubscriptionConnectionPoolSize()));
}
if (!config.getSlaveAddresses().isEmpty()) {
slaveDown(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
} }
writeConnectionHolder = new ConnectionPool<RedisConnection>(config, null, connectionManager.getGroup()); writeConnectionHolder = new ConnectionPool<RedisConnection>(config, null, connectionManager.getGroup());
} }
protected void setupMasterEntry(String host, int port) { public void setupMasterEntry(String host, int port) {
RedisClient client = connectionManager.createClient(host, port); RedisClient client = connectionManager.createClient(host, port);
masterEntry = new SubscribesConnectionEntry(client, config.getMasterConnectionPoolSize(), 0); masterEntry = new SubscribesConnectionEntry(client, config.getMasterConnectionPoolSize(), 0, connectListener, Mode.MASTER);
writeConnectionHolder.add(masterEntry); writeConnectionHolder.add(masterEntry);
} }
@ -95,11 +91,15 @@ public class MasterSlaveEntry<E extends ConnectionEntry> {
} }
public void addSlave(String host, int port) { public void addSlave(String host, int port) {
addSlave(host, port, true, Mode.SLAVE);
}
private void addSlave(String host, int port, boolean freezed, Mode mode) {
RedisClient client = connectionManager.createClient(host, port); RedisClient client = connectionManager.createClient(host, port);
SubscribesConnectionEntry entry = new SubscribesConnectionEntry(client, SubscribesConnectionEntry entry = new SubscribesConnectionEntry(client,
this.config.getSlaveConnectionPoolSize(), this.config.getSlaveConnectionPoolSize(),
this.config.getSlaveSubscriptionConnectionPoolSize()); this.config.getSlaveSubscriptionConnectionPoolSize(), connectListener, mode);
entry.setFreezed(true); entry.setFreezed(freezed);
slaveBalancer.add(entry); slaveBalancer.add(entry);
} }

@ -74,7 +74,7 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager {
protected void initEntry(MasterSlaveServersConfig config) { protected void initEntry(MasterSlaveServersConfig config) {
HashSet<ClusterSlotRange> slots = new HashSet<ClusterSlotRange>(); HashSet<ClusterSlotRange> slots = new HashSet<ClusterSlotRange>();
slots.add(singleSlotRange); slots.add(singleSlotRange);
SingleEntry entry = new SingleEntry(slots, this, config); SingleEntry entry = new SingleEntry(slots, this, config, connectListener);
entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
addEntry(singleSlotRange, entry); addEntry(singleSlotRange, entry);
} }

@ -22,6 +22,7 @@ import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
import org.redisson.cluster.ClusterSlotRange; import org.redisson.cluster.ClusterSlotRange;
import org.redisson.connection.ConnectionEntry.Mode;
import org.redisson.misc.ConnectionPool; import org.redisson.misc.ConnectionPool;
import org.redisson.misc.PubSubConnectionPoll; import org.redisson.misc.PubSubConnectionPoll;
@ -31,15 +32,16 @@ public class SingleEntry extends MasterSlaveEntry<SubscribesConnectionEntry> {
final ConnectionPool<RedisPubSubConnection> pubSubConnectionHolder; final ConnectionPool<RedisPubSubConnection> pubSubConnectionHolder;
public SingleEntry(Set<ClusterSlotRange> slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config) { public SingleEntry(Set<ClusterSlotRange> slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config, ConnectionListener connectListener) {
super(slotRanges, connectionManager, config); super(slotRanges, connectionManager, config, connectListener);
pubSubConnectionHolder = new PubSubConnectionPoll(config, null, connectionManager.getGroup()); pubSubConnectionHolder = new PubSubConnectionPoll(config, null, connectionManager.getGroup());
} }
@Override @Override
public void setupMasterEntry(String host, int port) { public void setupMasterEntry(String host, int port) {
RedisClient masterClient = connectionManager.createClient(host, port); RedisClient masterClient = connectionManager.createClient(host, port);
masterEntry = new SubscribesConnectionEntry(masterClient, config.getMasterConnectionPoolSize(), config.getSlaveSubscriptionConnectionPoolSize()); masterEntry = new SubscribesConnectionEntry(masterClient,
config.getMasterConnectionPoolSize(), config.getSlaveSubscriptionConnectionPoolSize(), connectListener, Mode.MASTER);
writeConnectionHolder.add(masterEntry); writeConnectionHolder.add(masterEntry);
pubSubConnectionHolder.add(masterEntry); pubSubConnectionHolder.add(masterEntry);
} }

@ -32,8 +32,8 @@ public class SubscribesConnectionEntry extends ConnectionEntry {
private final Queue<RedisPubSubConnection> freeSubscribeConnections = new ConcurrentLinkedQueue<RedisPubSubConnection>(); private final Queue<RedisPubSubConnection> freeSubscribeConnections = new ConcurrentLinkedQueue<RedisPubSubConnection>();
private final AtomicInteger connectionsCounter = new AtomicInteger(); private final AtomicInteger connectionsCounter = new AtomicInteger();
public SubscribesConnectionEntry(RedisClient client, int poolSize, int subscribePoolSize) { public SubscribesConnectionEntry(RedisClient client, int poolSize, int subscribePoolSize, ConnectionListener connectListener, Mode serverMode) {
super(client, poolSize); super(client, poolSize, connectListener, serverMode);
connectionsCounter.set(subscribePoolSize); connectionsCounter.set(subscribePoolSize);
} }
@ -58,7 +58,8 @@ public class SubscribesConnectionEntry extends ConnectionEntry {
if (connectionsCounter.get() == 0) { if (connectionsCounter.get() == 0) {
return false; return false;
} }
if (connectionsCounter.compareAndSet(connectionsCounter.get(), connectionsCounter.get() - 1)) { int value = connectionsCounter.get();
if (connectionsCounter.compareAndSet(value, value - 1)) {
return true; return true;
} }
} }

Loading…
Cancel
Save