|
|
|
@ -95,11 +95,6 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
RedisConnection connection = connectionFuture.toCompletableFuture()
|
|
|
|
|
.get(config.getConnectTimeout(), TimeUnit.MILLISECONDS);
|
|
|
|
|
|
|
|
|
|
connection.async(RedisCommands.SPUBLISH, "", "").thenAccept(r -> {
|
|
|
|
|
subscribeService.setShardingSupported(true);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (cfg.getNodeAddresses().size() == 1 && !addr.isIP()) {
|
|
|
|
|
configEndpointHostName = addr.getHost();
|
|
|
|
|
}
|
|
|
|
@ -135,7 +130,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
if (partition.getMasterAddress() == null) {
|
|
|
|
|
throw new IllegalStateException("Master node: " + partition.getNodeId() + " doesn't have address.");
|
|
|
|
|
throw new IllegalStateException("Master node: " + partition.getNodeId() + " doesn't have an address.");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
CompletionStage<Void> masterFuture = addMasterEntry(partition, cfg);
|
|
|
|
@ -176,9 +171,23 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
detectSharding();
|
|
|
|
|
scheduleClusterChangeCheck(cfg);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void detectSharding() {
|
|
|
|
|
MasterSlaveEntry entry = getEntrySet().iterator().next();
|
|
|
|
|
RedisConnection c = entry.connectionWriteOp(null).join();
|
|
|
|
|
try {
|
|
|
|
|
c.sync(RedisCommands.SPUBLISH, "", "");
|
|
|
|
|
subscribeService.setShardingSupported(true);
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
// skip
|
|
|
|
|
} finally {
|
|
|
|
|
entry.releaseWrite(c);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Collection<MasterSlaveEntry> getEntrySet() {
|
|
|
|
|
lazyConnect();
|
|
|
|
|