connection handling when isReadFromSlaves = false fixed. #345

pull/365/head
Nikita 9 years ago
parent db8b2e7f78
commit 3446ec6962

@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.ClusterServersConfig; import org.redisson.ClusterServersConfig;
import org.redisson.Config; import org.redisson.Config;
@ -37,6 +38,7 @@ import org.redisson.cluster.ClusterNodeInfo.Flag;
import org.redisson.connection.CRC16; import org.redisson.connection.CRC16;
import org.redisson.connection.MasterSlaveConnectionManager; import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.SingleEntry;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -56,7 +58,10 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private ScheduledFuture<?> monitorFuture; private ScheduledFuture<?> monitorFuture;
private final boolean isReadFromSlaves;
public ClusterConnectionManager(ClusterServersConfig cfg, Config config) { public ClusterConnectionManager(ClusterServersConfig cfg, Config config) {
isReadFromSlaves = cfg.isReadFromSlaves();
connectListener = new ClusterConnectionListener(cfg.isReadFromSlaves()); connectListener = new ClusterConnectionListener(cfg.isReadFromSlaves());
init(config); init(config);
@ -144,26 +149,41 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
} }
MasterSlaveServersConfig config = create(cfg); MasterSlaveServersConfig config = create(cfg);
log.info("master: {} added for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges());
config.setMasterAddress(partition.getMasterAddress()); config.setMasterAddress(partition.getMasterAddress());
final AtomicReference<MasterSlaveEntry> entry = new AtomicReference<MasterSlaveEntry>();
List<Future<Void>> futures = new ArrayList<Future<Void>>();
if (isReadFromSlaves) {
config.setSlaveAddresses(partition.getSlaveAddresses()); config.setSlaveAddresses(partition.getSlaveAddresses());
MasterSlaveEntry e = new MasterSlaveEntry(partition.getSlotRanges(), this, config);
List<Future<Void>> fs = e.initSlaveBalancer(config);
futures.addAll(fs);
entry.set(e);
log.info("slaves: {} added for slot ranges: {}", partition.getSlaveAddresses(), partition.getSlotRanges()); log.info("slaves: {} added for slot ranges: {}", partition.getSlaveAddresses(), partition.getSlotRanges());
} else {
SingleEntry e = new SingleEntry(partition.getSlotRanges(), this, config);
entry.set(e);
}
final MasterSlaveEntry entry = new MasterSlaveEntry(partition.getSlotRanges(), this, config); Future<Void> f = entry.get().setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
List<Future<Void>> fs = entry.initSlaveBalancer(config);
Future<Void> f = entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
f.addListener(new FutureListener<Void>() { f.addListener(new FutureListener<Void>() {
@Override @Override
public void operationComplete(Future<Void> future) throws Exception { public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
return;
}
for (ClusterSlotRange slotRange : partition.getSlotRanges()) { for (ClusterSlotRange slotRange : partition.getSlotRanges()) {
addEntry(slotRange, entry); addEntry(slotRange, entry.get());
lastPartitions.put(slotRange, partition); lastPartitions.put(slotRange, partition);
} }
log.info("master: {} added for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges());
} }
}); });
fs.add(f); futures.add(f);
return fs; return futures;
} }
private void monitorClusterChange(final ClusterServersConfig cfg) { private void monitorClusterChange(final ClusterServersConfig cfg) {

Loading…
Cancel
Save