Sentinel connection optimization. #338

pull/365/head
Nikita 9 years ago
parent 3587015180
commit ae72c05b93

@ -38,6 +38,8 @@ import org.redisson.misc.URIBuilder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
public class SentinelConnectionManager extends MasterSlaveConnectionManager { public class SentinelConnectionManager extends MasterSlaveConnectionManager {
@ -129,55 +131,70 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
slaveDown(parts[0], parts[1]); slaveDown(parts[0], parts[1]);
} }
List<Future<RedisPubSubConnection>> connectionFutures = new ArrayList<Future<RedisPubSubConnection>>(cfg.getSentinelAddresses().size());
for (URI addr : cfg.getSentinelAddresses()) { for (URI addr : cfg.getSentinelAddresses()) {
registerSentinel(cfg, addr, c); Future<RedisPubSubConnection> future = registerSentinel(cfg, addr, c);
connectionFutures.add(future);
}
for (Future<RedisPubSubConnection> future : connectionFutures) {
future.syncUninterruptibly();
} }
} }
private void registerSentinel(final SentinelServersConfig cfg, final URI addr, final MasterSlaveServersConfig c) { private Future<RedisPubSubConnection> registerSentinel(final SentinelServersConfig cfg, final URI addr, final MasterSlaveServersConfig c) {
RedisClient client = createClient(addr.getHost(), addr.getPort(), c.getConnectTimeout()); RedisClient client = createClient(addr.getHost(), addr.getPort(), c.getConnectTimeout());
RedisClient oldClient = sentinels.putIfAbsent(addr.getHost() + ":" + addr.getPort(), client); RedisClient oldClient = sentinels.putIfAbsent(addr.getHost() + ":" + addr.getPort(), client);
if (oldClient != null) { if (oldClient != null) {
return; return newSucceededFuture(null);
} }
try { Future<RedisPubSubConnection> pubsubFuture = client.connectPubSubAsync();
RedisPubSubConnection pubsub = client.connectPubSub(); pubsubFuture.addListener(new FutureListener<RedisPubSubConnection>() {
pubsub.addListener(new BaseRedisPubSubListener<String>() { @Override
public void operationComplete(Future<RedisPubSubConnection> future) throws Exception {
if (!future.isSuccess()) {
log.warn("Can't connect to sentinel: {}:{}", addr.getHost(), addr.getPort());
return;
}
@Override RedisPubSubConnection pubsub = future.getNow();
public void onMessage(String channel, String msg) { pubsub.addListener(new BaseRedisPubSubListener<String>() {
if ("+sentinel".equals(channel)) {
onSentinelAdded(cfg, msg, c); @Override
} public void onMessage(String channel, String msg) {
if ("+slave".equals(channel)) { if ("+sentinel".equals(channel)) {
onSlaveAdded(addr, msg); onSentinelAdded(cfg, msg, c);
} }
if ("+sdown".equals(channel)) { if ("+slave".equals(channel)) {
onNodeDown(addr, msg); onSlaveAdded(addr, msg);
} }
if ("-sdown".equals(channel)) { if ("+sdown".equals(channel)) {
onSlaveUp(addr, msg); onNodeDown(addr, msg);
}
if ("-sdown".equals(channel)) {
onSlaveUp(addr, msg);
}
if ("+switch-master".equals(channel)) {
onMasterChange(cfg, addr, msg);
}
} }
if ("+switch-master".equals(channel)) {
onMasterChange(cfg, addr, msg);
}
}
@Override @Override
public boolean onStatus(PubSubType type, String channel) { public boolean onStatus(PubSubType type, String channel) {
if (type == PubSubType.SUBSCRIBE) { if (type == PubSubType.SUBSCRIBE) {
log.debug("subscribed to channel: {} from Sentinel {}:{}", channel, addr.getHost(), addr.getPort()); log.debug("subscribed to channel: {} from Sentinel {}:{}", channel, addr.getHost(), addr.getPort());
}
return true;
} }
return true; });
}
});
pubsub.subscribe(StringCodec.INSTANCE, "+switch-master", "+sdown", "-sdown", "+slave", "+sentinel"); pubsub.subscribe(StringCodec.INSTANCE, "+switch-master", "+sdown", "-sdown", "+slave", "+sentinel");
log.info("sentinel: {}:{} added", addr.getHost(), addr.getPort()); log.info("sentinel: {}:{} added", addr.getHost(), addr.getPort());
} catch (RedisConnectionException e) { }
log.warn("can't connect to sentinel: {}:{}", addr.getHost(), addr.getPort()); });
}
return pubsubFuture;
} }
protected void onSentinelAdded(SentinelServersConfig cfg, String msg, MasterSlaveServersConfig c) { protected void onSentinelAdded(SentinelServersConfig cfg, String msg, MasterSlaveServersConfig c) {

Loading…
Cancel
Save