Merge branch 'master' into 3.0.0

pull/1303/head
Nikita 7 years ago
commit fb04f36d98

@ -110,31 +110,40 @@ public class MasterSlaveEntry {
} }
public RPromise<RedisClient> setupMasterEntry(URI address) { public RPromise<RedisClient> setupMasterEntry(URI address) {
final RPromise<RedisClient> result = new RedissonPromise<RedisClient>();
RedisClient client = connectionManager.createClient(NodeType.MASTER, address); RedisClient client = connectionManager.createClient(NodeType.MASTER, address);
masterEntry = new ClientConnectionsEntry(
client,
config.getMasterConnectionMinimumIdleSize(),
config.getMasterConnectionPoolSize(),
config.getSubscriptionConnectionMinimumIdleSize(),
config.getSubscriptionConnectionPoolSize(),
connectionManager,
NodeType.MASTER);
RPromise<RedisClient> result = new RedissonPromise<RedisClient>();
CountableListener<RedisClient> listener = new CountableListener<RedisClient>(result, client);
RFuture<InetSocketAddress> addrFuture = client.resolveAddr(); RFuture<InetSocketAddress> addrFuture = client.resolveAddr();
listener.incCounter(); addrFuture.addListener(new FutureListener<InetSocketAddress>() {
addrFuture.addListener(listener);
RFuture<Void> writeFuture = writeConnectionPool.add(masterEntry); @Override
listener.incCounter(); public void operationComplete(Future<InetSocketAddress> future) throws Exception {
writeFuture.addListener(listener); if (!future.isSuccess()) {
result.tryFailure(future.cause());
return;
}
if (config.getSubscriptionMode() == SubscriptionMode.MASTER) { masterEntry = new ClientConnectionsEntry(
RFuture<Void> pubSubFuture = pubSubConnectionPool.add(masterEntry); client,
listener.incCounter(); config.getMasterConnectionMinimumIdleSize(),
pubSubFuture.addListener(listener); config.getMasterConnectionPoolSize(),
} config.getSubscriptionConnectionMinimumIdleSize(),
config.getSubscriptionConnectionPoolSize(),
connectionManager,
NodeType.MASTER);
CountableListener<RedisClient> listener = new CountableListener<RedisClient>(result, client);
RFuture<Void> writeFuture = writeConnectionPool.add(masterEntry);
listener.incCounter();
writeFuture.addListener(listener);
if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
RFuture<Void> pubSubFuture = pubSubConnectionPool.add(masterEntry);
listener.incCounter();
pubSubFuture.addListener(listener);
}
}
});
return result; return result;
} }
@ -224,7 +233,6 @@ public class MasterSlaveEntry {
return; return;
} }
System.out.println("channelName " + channelName + " resubscribed!");
subscribeCodec.addListener(new FutureListener<Codec>() { subscribeCodec.addListener(new FutureListener<Codec>() {
@Override @Override
public void operationComplete(Future<Codec> future) throws Exception { public void operationComplete(Future<Codec> future) throws Exception {
@ -344,19 +352,8 @@ public class MasterSlaveEntry {
return addSlave(address, false, NodeType.SLAVE); return addSlave(address, false, NodeType.SLAVE);
} }
private RFuture<Void> addSlave(URI address, boolean freezed, NodeType nodeType) { private RFuture<Void> addSlave(URI address, final boolean freezed, final NodeType nodeType) {
RedisClient client = connectionManager.createClient(NodeType.SLAVE, address); RedisClient client = connectionManager.createClient(NodeType.SLAVE, address);
final ClientConnectionsEntry entry = new ClientConnectionsEntry(client,
this.config.getSlaveConnectionMinimumIdleSize(),
this.config.getSlaveConnectionPoolSize(),
this.config.getSubscriptionConnectionMinimumIdleSize(),
this.config.getSubscriptionConnectionPoolSize(), connectionManager, nodeType);
if (freezed) {
synchronized (entry) {
entry.setFreezed(freezed);
entry.setFreezeReason(FreezeReason.SYSTEM);
}
}
final RPromise<Void> result = new RedissonPromise<Void>(); final RPromise<Void> result = new RedissonPromise<Void>();
RFuture<InetSocketAddress> addrFuture = client.resolveAddr(); RFuture<InetSocketAddress> addrFuture = client.resolveAddr();
@ -368,6 +365,17 @@ public class MasterSlaveEntry {
return; return;
} }
ClientConnectionsEntry entry = new ClientConnectionsEntry(client,
config.getSlaveConnectionMinimumIdleSize(),
config.getSlaveConnectionPoolSize(),
config.getSubscriptionConnectionMinimumIdleSize(),
config.getSubscriptionConnectionPoolSize(), connectionManager, nodeType);
if (freezed) {
synchronized (entry) {
entry.setFreezed(freezed);
entry.setFreezeReason(FreezeReason.SYSTEM);
}
}
RFuture<Void> addFuture = slaveBalancer.add(entry); RFuture<Void> addFuture = slaveBalancer.add(entry);
addFuture.addListener(new TransferListener<Void>(result)); addFuture.addListener(new TransferListener<Void>(result));
} }

Loading…
Cancel
Save