|
|
@ -189,7 +189,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
public void run() {
|
|
|
|
public void run() {
|
|
|
|
List<RedisClient> sentinels = new ArrayList<>(SentinelConnectionManager.this.sentinels.values());
|
|
|
|
List<RedisClient> sentinels = new ArrayList<>(SentinelConnectionManager.this.sentinels.values());
|
|
|
|
|
|
|
|
|
|
|
|
final AtomicInteger sentinelsCounter = new AtomicInteger(sentinels.size());
|
|
|
|
AtomicInteger sentinelsCounter = new AtomicInteger(sentinels.size());
|
|
|
|
FutureListener<List<InetSocketAddress>> commonListener = new FutureListener<List<InetSocketAddress>>() {
|
|
|
|
FutureListener<List<InetSocketAddress>> commonListener = new FutureListener<List<InetSocketAddress>>() {
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public void operationComplete(Future<List<InetSocketAddress>> future) throws Exception {
|
|
|
|
public void operationComplete(Future<List<InetSocketAddress>> future) throws Exception {
|
|
|
@ -199,7 +199,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
for (final RedisClient client : sentinels) {
|
|
|
|
for (RedisClient client : sentinels) {
|
|
|
|
Future<List<InetSocketAddress>> allNodes = sentinelResolver.resolveAll(InetSocketAddress.createUnresolved(client.getAddr().getHostName(), client.getAddr().getPort()));
|
|
|
|
Future<List<InetSocketAddress>> allNodes = sentinelResolver.resolveAll(InetSocketAddress.createUnresolved(client.getAddr().getHostName(), client.getAddr().getPort()));
|
|
|
|
allNodes.addListener(new FutureListener<List<InetSocketAddress>>() {
|
|
|
|
allNodes.addListener(new FutureListener<List<InetSocketAddress>>() {
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
@ -244,7 +244,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
}, config.getDnsMonitoringInterval(), TimeUnit.MILLISECONDS);
|
|
|
|
}, config.getDnsMonitoringInterval(), TimeUnit.MILLISECONDS);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private void scheduleChangeCheck(final SentinelServersConfig cfg, final Iterator<RedisClient> iterator) {
|
|
|
|
private void scheduleChangeCheck(SentinelServersConfig cfg, Iterator<RedisClient> iterator) {
|
|
|
|
monitorFuture = group.schedule(new Runnable() {
|
|
|
|
monitorFuture = group.schedule(new Runnable() {
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public void run() {
|
|
|
|
public void run() {
|
|
|
@ -261,7 +261,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
}, cfg.getScanInterval(), TimeUnit.MILLISECONDS);
|
|
|
|
}, cfg.getScanInterval(), TimeUnit.MILLISECONDS);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private void checkState(final SentinelServersConfig cfg, final Iterator<RedisClient> iterator, final AtomicReference<Throwable> lastException) {
|
|
|
|
private void checkState(SentinelServersConfig cfg, Iterator<RedisClient> iterator, AtomicReference<Throwable> lastException) {
|
|
|
|
if (!iterator.hasNext()) {
|
|
|
|
if (!iterator.hasNext()) {
|
|
|
|
if (lastException.get() != null) {
|
|
|
|
if (lastException.get() != null) {
|
|
|
|
log.error("Can't update cluster state", lastException.get());
|
|
|
|
log.error("Can't update cluster state", lastException.get());
|
|
|
@ -288,7 +288,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private void updateState(final SentinelServersConfig cfg, final RedisConnection connection, final Iterator<RedisClient> iterator) {
|
|
|
|
private void updateState(SentinelServersConfig cfg, RedisConnection connection, Iterator<RedisClient> iterator) {
|
|
|
|
AtomicInteger commands = new AtomicInteger(2);
|
|
|
|
AtomicInteger commands = new AtomicInteger(2);
|
|
|
|
BiConsumer<Object, Throwable> commonListener = new BiConsumer<Object, Throwable>() {
|
|
|
|
BiConsumer<Object, Throwable> commonListener = new BiConsumer<Object, Throwable>() {
|
|
|
|
|
|
|
|
|
|
|
@ -317,8 +317,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
final String current = currentMaster.get();
|
|
|
|
String current = currentMaster.get();
|
|
|
|
final String newMaster = createAddress(master.get(0), master.get(1));
|
|
|
|
String newMaster = createAddress(master.get(0), master.get(1));
|
|
|
|
if (!newMaster.equals(current)
|
|
|
|
if (!newMaster.equals(current)
|
|
|
|
&& currentMaster.compareAndSet(current, newMaster)) {
|
|
|
|
&& currentMaster.compareAndSet(current, newMaster)) {
|
|
|
|
RFuture<RedisClient> changeFuture = changeMaster(singleSlotRange.getStartSlot(), URIBuilder.create(newMaster));
|
|
|
|
RFuture<RedisClient> changeFuture = changeMaster(singleSlotRange.getStartSlot(), URIBuilder.create(newMaster));
|
|
|
@ -437,7 +437,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
return entry;
|
|
|
|
return entry;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private RFuture<Void> registerSentinel(final URI addr, final MasterSlaveServersConfig c) {
|
|
|
|
private RFuture<Void> registerSentinel(URI addr, MasterSlaveServersConfig c) {
|
|
|
|
String key = addr.getHost() + ":" + addr.getPort();
|
|
|
|
String key = addr.getHost() + ":" + addr.getPort();
|
|
|
|
RedisClient sentinel = sentinels.get(key);
|
|
|
|
RedisClient sentinel = sentinels.get(key);
|
|
|
|
if (sentinel != null) {
|
|
|
|
if (sentinel != null) {
|
|
|
|