|
|
|
@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
|
|
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
|
|
|
|
|
|
import org.redisson.api.RFuture;
|
|
|
|
|
import org.redisson.client.RedisClient;
|
|
|
|
|
import org.redisson.client.RedisConnection;
|
|
|
|
|
import org.redisson.client.RedisConnectionException;
|
|
|
|
@ -46,13 +47,13 @@ import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
|
|
|
|
|
import org.redisson.connection.MasterSlaveConnectionManager;
|
|
|
|
|
import org.redisson.connection.MasterSlaveEntry;
|
|
|
|
|
import org.redisson.connection.SingleEntry;
|
|
|
|
|
import org.redisson.misc.RPromise;
|
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
|
|
|
|
import io.netty.util.concurrent.Future;
|
|
|
|
|
import io.netty.util.concurrent.FutureListener;
|
|
|
|
|
import io.netty.util.concurrent.GlobalEventExecutor;
|
|
|
|
|
import io.netty.util.concurrent.Promise;
|
|
|
|
|
import io.netty.util.concurrent.ScheduledFuture;
|
|
|
|
|
import io.netty.util.internal.PlatformDependent;
|
|
|
|
|
|
|
|
|
@ -77,7 +78,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
|
|
|
|
|
Exception lastException = null;
|
|
|
|
|
for (URI addr : cfg.getNodeAddresses()) {
|
|
|
|
|
Future<RedisConnection> connectionFuture = connect(cfg, addr);
|
|
|
|
|
RFuture<RedisConnection> connectionFuture = connect(cfg, addr);
|
|
|
|
|
try {
|
|
|
|
|
RedisConnection connection = connectionFuture.syncUninterruptibly().getNow();
|
|
|
|
|
List<ClusterNodeInfo> nodes = connection.sync(RedisCommands.CLUSTER_NODES);
|
|
|
|
@ -93,21 +94,21 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
lastClusterNode = addr;
|
|
|
|
|
|
|
|
|
|
Collection<ClusterPartition> partitions = parsePartitions(nodes);
|
|
|
|
|
List<Future<Collection<Future<Void>>>> futures = new ArrayList<Future<Collection<Future<Void>>>>();
|
|
|
|
|
List<RFuture<Collection<RFuture<Void>>>> futures = new ArrayList<RFuture<Collection<RFuture<Void>>>>();
|
|
|
|
|
for (ClusterPartition partition : partitions) {
|
|
|
|
|
if (partition.isMasterFail()) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
Future<Collection<Future<Void>>> masterFuture = addMasterEntry(partition, cfg);
|
|
|
|
|
RFuture<Collection<RFuture<Void>>> masterFuture = addMasterEntry(partition, cfg);
|
|
|
|
|
futures.add(masterFuture);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (Future<Collection<Future<Void>>> masterFuture : futures) {
|
|
|
|
|
for (RFuture<Collection<RFuture<Void>>> masterFuture : futures) {
|
|
|
|
|
masterFuture.awaitUninterruptibly();
|
|
|
|
|
if (!masterFuture.isSuccess()) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
for (Future<Void> future : masterFuture.getNow()) {
|
|
|
|
|
for (RFuture<Void> future : masterFuture.getNow()) {
|
|
|
|
|
future.awaitUninterruptibly();
|
|
|
|
|
if (!future.isSuccess()) {
|
|
|
|
|
continue;
|
|
|
|
@ -140,15 +141,15 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Future<RedisConnection> connect(ClusterServersConfig cfg, final URI addr) {
|
|
|
|
|
private RFuture<RedisConnection> connect(ClusterServersConfig cfg, final URI addr) {
|
|
|
|
|
RedisConnection connection = nodeConnections.get(addr);
|
|
|
|
|
if (connection != null) {
|
|
|
|
|
return newSucceededFuture(connection);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
RedisClient client = createClient(addr.getHost(), addr.getPort(), cfg.getConnectTimeout(), cfg.getRetryInterval() * cfg.getRetryAttempts());
|
|
|
|
|
final Promise<RedisConnection> result = newPromise();
|
|
|
|
|
Future<RedisConnection> future = client.connectAsync();
|
|
|
|
|
final RPromise<RedisConnection> result = newPromise();
|
|
|
|
|
RFuture<RedisConnection> future = client.connectAsync();
|
|
|
|
|
future.addListener(new FutureListener<RedisConnection>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<RedisConnection> future) throws Exception {
|
|
|
|
@ -158,7 +159,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
RedisConnection connection = future.getNow();
|
|
|
|
|
Promise<RedisConnection> promise = newPromise();
|
|
|
|
|
RPromise<RedisConnection> promise = newPromise();
|
|
|
|
|
connectListener.onConnect(promise, connection, null, config);
|
|
|
|
|
promise.addListener(new FutureListener<RedisConnection>() {
|
|
|
|
|
@Override
|
|
|
|
@ -188,7 +189,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
protected void initEntry(MasterSlaveServersConfig config) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Future<Collection<Future<Void>>> addMasterEntry(final ClusterPartition partition, final ClusterServersConfig cfg) {
|
|
|
|
|
private RFuture<Collection<RFuture<Void>>> addMasterEntry(final ClusterPartition partition, final ClusterServersConfig cfg) {
|
|
|
|
|
if (partition.isMasterFail()) {
|
|
|
|
|
RedisException e = new RedisException("Failed to add master: " +
|
|
|
|
|
partition.getMasterAddress() + " for slot ranges: " +
|
|
|
|
@ -201,8 +202,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
return newFailedFuture(e);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
final Promise<Collection<Future<Void>>> result = newPromise();
|
|
|
|
|
Future<RedisConnection> connectionFuture = connect(cfg, partition.getMasterAddress());
|
|
|
|
|
final RPromise<Collection<RFuture<Void>>> result = newPromise();
|
|
|
|
|
RFuture<RedisConnection> connectionFuture = connect(cfg, partition.getMasterAddress());
|
|
|
|
|
connectionFuture.addListener(new FutureListener<RedisConnection>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<RedisConnection> future) throws Exception {
|
|
|
|
@ -213,7 +214,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
final RedisConnection connection = future.getNow();
|
|
|
|
|
Future<Map<String, String>> clusterFuture = connection.async(RedisCommands.CLUSTER_INFO);
|
|
|
|
|
RFuture<Map<String, String>> clusterFuture = connection.async(RedisCommands.CLUSTER_INFO);
|
|
|
|
|
clusterFuture.addListener(new FutureListener<Map<String, String>>() {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -238,7 +239,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
config.setMasterAddress(partition.getMasterAddress());
|
|
|
|
|
|
|
|
|
|
final MasterSlaveEntry e;
|
|
|
|
|
List<Future<Void>> futures = new ArrayList<Future<Void>>();
|
|
|
|
|
List<RFuture<Void>> futures = new ArrayList<RFuture<Void>>();
|
|
|
|
|
if (config.getReadMode() == ReadMode.MASTER) {
|
|
|
|
|
e = new SingleEntry(partition.getSlotRanges(), ClusterConnectionManager.this, config);
|
|
|
|
|
} else {
|
|
|
|
@ -246,7 +247,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
|
|
|
|
|
e = new MasterSlaveEntry(partition.getSlotRanges(), ClusterConnectionManager.this, config);
|
|
|
|
|
|
|
|
|
|
List<Future<Void>> fs = e.initSlaveBalancer(partition.getFailedSlaveAddresses());
|
|
|
|
|
List<RFuture<Void>> fs = e.initSlaveBalancer(partition.getFailedSlaveAddresses());
|
|
|
|
|
futures.addAll(fs);
|
|
|
|
|
if (!partition.getSlaveAddresses().isEmpty()) {
|
|
|
|
|
log.info("slaves: {} added for slot ranges: {}", partition.getSlaveAddresses(), partition.getSlotRanges());
|
|
|
|
@ -256,8 +257,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Future<Void> f = e.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
|
|
|
|
|
final Promise<Void> initFuture = newPromise();
|
|
|
|
|
RFuture<Void> f = e.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
|
|
|
|
|
final RPromise<Void> initFuture = newPromise();
|
|
|
|
|
futures.add(initFuture);
|
|
|
|
|
f.addListener(new FutureListener<Void>() {
|
|
|
|
|
@Override
|
|
|
|
@ -327,7 +328,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
final URI uri = iterator.next();
|
|
|
|
|
Future<RedisConnection> connectionFuture = connect(cfg, uri);
|
|
|
|
|
RFuture<RedisConnection> connectionFuture = connect(cfg, uri);
|
|
|
|
|
connectionFuture.addListener(new FutureListener<RedisConnection>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<RedisConnection> future) throws Exception {
|
|
|
|
@ -344,7 +345,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void updateClusterState(final ClusterServersConfig cfg, final RedisConnection connection, final Iterator<URI> iterator, final URI uri) {
|
|
|
|
|
Future<List<ClusterNodeInfo>> future = connection.async(RedisCommands.CLUSTER_NODES);
|
|
|
|
|
RFuture<List<ClusterNodeInfo>> future = connection.async(RedisCommands.CLUSTER_NODES);
|
|
|
|
|
future.addListener(new FutureListener<List<ClusterNodeInfo>>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<List<ClusterNodeInfo>> future) throws Exception {
|
|
|
|
@ -367,7 +368,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
final Collection<ClusterPartition> newPartitions = parsePartitions(nodes);
|
|
|
|
|
Future<Void> masterFuture = checkMasterNodesChange(cfg, newPartitions);
|
|
|
|
|
RFuture<Void> masterFuture = checkMasterNodesChange(cfg, newPartitions);
|
|
|
|
|
checkSlaveNodesChange(newPartitions);
|
|
|
|
|
masterFuture.addListener(new FutureListener<Void>() {
|
|
|
|
|
@Override
|
|
|
|
@ -434,7 +435,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
Set<URI> addedSlaves = new HashSet<URI>(newPart.getSlaveAddresses());
|
|
|
|
|
addedSlaves.removeAll(currentPart.getSlaveAddresses());
|
|
|
|
|
for (final URI uri : addedSlaves) {
|
|
|
|
|
Future<Void> future = entry.addSlave(uri.getHost(), uri.getPort());
|
|
|
|
|
RFuture<Void> future = entry.addSlave(uri.getHost(), uri.getPort());
|
|
|
|
|
future.addListener(new FutureListener<Void>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<Void> future) throws Exception {
|
|
|
|
@ -470,7 +471,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Future<Void> checkMasterNodesChange(ClusterServersConfig cfg, Collection<ClusterPartition> newPartitions) {
|
|
|
|
|
private RFuture<Void> checkMasterNodesChange(ClusterServersConfig cfg, Collection<ClusterPartition> newPartitions) {
|
|
|
|
|
List<ClusterPartition> newMasters = new ArrayList<ClusterPartition>();
|
|
|
|
|
for (final ClusterPartition newPart : newPartitions) {
|
|
|
|
|
boolean masterFound = false;
|
|
|
|
@ -509,21 +510,21 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
return newSucceededFuture(null);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
final Promise<Void> result = newPromise();
|
|
|
|
|
final RPromise<Void> result = newPromise();
|
|
|
|
|
final AtomicInteger masters = new AtomicInteger(newMasters.size());
|
|
|
|
|
final Queue<Future<Void>> futures = new ConcurrentLinkedQueue<Future<Void>>();
|
|
|
|
|
final Queue<RFuture<Void>> futures = new ConcurrentLinkedQueue<RFuture<Void>>();
|
|
|
|
|
for (ClusterPartition newPart : newMasters) {
|
|
|
|
|
Future<Collection<Future<Void>>> future = addMasterEntry(newPart, cfg);
|
|
|
|
|
future.addListener(new FutureListener<Collection<Future<Void>>>() {
|
|
|
|
|
RFuture<Collection<RFuture<Void>>> future = addMasterEntry(newPart, cfg);
|
|
|
|
|
future.addListener(new FutureListener<Collection<RFuture<Void>>>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<Collection<Future<Void>>> future) throws Exception {
|
|
|
|
|
public void operationComplete(Future<Collection<RFuture<Void>>> future) throws Exception {
|
|
|
|
|
if (future.isSuccess()) {
|
|
|
|
|
futures.addAll(future.getNow());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (masters.decrementAndGet() == 0) {
|
|
|
|
|
final AtomicInteger nodes = new AtomicInteger(futures.size());
|
|
|
|
|
for (Future<Void> nodeFuture : futures) {
|
|
|
|
|
for (RFuture<Void> nodeFuture : futures) {
|
|
|
|
|
nodeFuture.addListener(new FutureListener<Void>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<Void> future) throws Exception {
|
|
|
|
|