ReadMode.MASTER_SLAVE added. #380

pull/382/head
Nikita 9 years ago
parent 91e4d7e4e4
commit ade8ef7a05

@ -25,6 +25,11 @@ public enum ReadMode {
/**
* Read from master node
*/
MASTER
MASTER,
/**
* Read from master and slave nodes
*/
MASTER_SLAVE,
}

@ -200,7 +200,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
config.setSlaveAddresses(partition.getSlaveAddresses());
e = new MasterSlaveEntry(partition.getSlotRanges(), ClusterConnectionManager.this, config);
List<Future<Void>> fs = e.initSlaveBalancer(config);
List<Future<Void>> fs = e.initSlaveBalancer();
futures.addAll(fs);
if (!partition.getSlaveAddresses().isEmpty()) {

@ -195,20 +195,17 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
protected void initEntry(MasterSlaveServersConfig config) {
if (config.getReadMode() == ReadMode.MASTER) {
HashSet<ClusterSlotRange> slots = new HashSet<ClusterSlotRange>();
slots.add(singleSlotRange);
HashSet<ClusterSlotRange> slots = new HashSet<ClusterSlotRange>();
slots.add(singleSlotRange);
if (config.getReadMode() == ReadMode.MASTER) {
SingleEntry entry = new SingleEntry(slots, this, config);
Future<Void> f = entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
f.syncUninterruptibly();
addEntry(singleSlotRange, entry);
} else {
HashSet<ClusterSlotRange> slots = new HashSet<ClusterSlotRange>();
slots.add(singleSlotRange);
MasterSlaveEntry entry = new MasterSlaveEntry(slots, this, config);
List<Future<Void>> fs = entry.initSlaveBalancer(config);
List<Future<Void>> fs = entry.initSlaveBalancer();
Future<Void> f = entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
fs.add(f);
for (Future<Void> future : fs) {

@ -24,6 +24,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.ReadMode;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
@ -67,8 +68,8 @@ public class MasterSlaveEntry {
writeConnectionHolder = new MasterConnectionPool(config, connectionManager, this);
}
public List<Future<Void>> initSlaveBalancer(MasterSlaveServersConfig config) {
boolean freezeMasterAsSlave = !config.getSlaveAddresses().isEmpty();
public List<Future<Void>> initSlaveBalancer() {
boolean freezeMasterAsSlave = !config.getSlaveAddresses().isEmpty() && config.getReadMode() == ReadMode.SLAVE;
List<Future<Void>> result = new LinkedList<Future<Void>>();
Future<Void> f = addSlave(config.getMasterAddress().getHost(), config.getMasterAddress().getPort(), freezeMasterAsSlave, NodeType.MASTER);
@ -89,8 +90,9 @@ public class MasterSlaveEntry {
public Collection<RedisPubSubConnection> slaveDown(String host, int port, FreezeReason freezeReason) {
Collection<RedisPubSubConnection> conns = slaveBalancer.freeze(host, port, freezeReason);
// add master as slave if no more slaves available
if (slaveBalancer.getAvailableClients() == 0) {
if (config.getReadMode() == ReadMode.SLAVE && slaveBalancer.getAvailableClients() == 0) {
InetSocketAddress addr = masterEntry.getClient().getAddr();
if (slaveUp(addr.getHostName(), addr.getPort(), FreezeReason.SYSTEM)) {
log.info("master {}:{} used as slave", addr.getHostName(), addr.getPort());
@ -128,7 +130,8 @@ public class MasterSlaveEntry {
InetSocketAddress addr = masterEntry.getClient().getAddr();
// exclude master from slaves
if (!addr.getHostName().equals(host) || port != addr.getPort()) {
if (config.getReadMode() == ReadMode.SLAVE
&& (!addr.getHostName().equals(host) || port != addr.getPort())) {
connectionManager.slaveDown(this, addr.getHostName(), addr.getPort(), FreezeReason.SYSTEM);
}
return true;
@ -146,8 +149,9 @@ public class MasterSlaveEntry {
writeConnectionHolder.remove(oldMaster);
oldMaster.freezeMaster(FreezeReason.MANAGER);
if (slaveBalancer.getAvailableClients() > 1) {
// more than one slave available, so master could be removed from slaves
// more than one slave available, so master can be removed from slaves
if (config.getReadMode() == ReadMode.SLAVE
&& slaveBalancer.getAvailableClients() > 1) {
connectionManager.slaveDown(this, host, port, FreezeReason.SYSTEM);
}
connectionManager.shutdownAsync(oldMaster.getClient());

Loading…
Cancel
Save