diff --git a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index b329f702b..279bb2cad 100644 --- a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.redisson.ClusterServersConfig; import org.redisson.Config; @@ -37,6 +38,7 @@ import org.redisson.cluster.ClusterNodeInfo.Flag; import org.redisson.connection.CRC16; import org.redisson.connection.MasterSlaveConnectionManager; import org.redisson.connection.MasterSlaveEntry; +import org.redisson.connection.SingleEntry; import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,7 +58,10 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { private ScheduledFuture monitorFuture; + private final boolean isReadFromSlaves; + public ClusterConnectionManager(ClusterServersConfig cfg, Config config) { + isReadFromSlaves = cfg.isReadFromSlaves(); connectListener = new ClusterConnectionListener(cfg.isReadFromSlaves()); init(config); @@ -144,26 +149,41 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } MasterSlaveServersConfig config = create(cfg); - log.info("master: {} added for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges()); config.setMasterAddress(partition.getMasterAddress()); - config.setSlaveAddresses(partition.getSlaveAddresses()); - log.info("slaves: {} added for slot ranges: {}", partition.getSlaveAddresses(), partition.getSlotRanges()); + final AtomicReference entry = new AtomicReference(); + List> futures = new ArrayList>(); + if (isReadFromSlaves) { + config.setSlaveAddresses(partition.getSlaveAddresses()); + + MasterSlaveEntry e = new MasterSlaveEntry(partition.getSlotRanges(), this, config); + List> fs = e.initSlaveBalancer(config); + futures.addAll(fs); + entry.set(e); + + log.info("slaves: {} added for slot ranges: {}", partition.getSlaveAddresses(), partition.getSlotRanges()); + } else { + SingleEntry e = new SingleEntry(partition.getSlotRanges(), this, config); + entry.set(e); + } - final MasterSlaveEntry entry = new MasterSlaveEntry(partition.getSlotRanges(), this, config); - List> fs = entry.initSlaveBalancer(config); - Future f = entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); + Future f = entry.get().setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); f.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + return; + } for (ClusterSlotRange slotRange : partition.getSlotRanges()) { - addEntry(slotRange, entry); + addEntry(slotRange, entry.get()); lastPartitions.put(slotRange, partition); } + + log.info("master: {} added for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges()); } }); - fs.add(f); - return fs; + futures.add(f); + return futures; } private void monitorClusterChange(final ClusterServersConfig cfg) {