Fixed - RedissonNode shutdown process. #1291

pull/1461/head
Nikita 7 years ago
parent 9a31ce81a2
commit 8ea5a5d1c8

@ -582,7 +582,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
for (Integer slot : removedSlots) {
MasterSlaveEntry entry = removeEntry(slot);
if (entry.getSlotRanges().isEmpty()) {
entry.shutdownMasterAsync();
entry.shutdownAsync();
log.info("{} master and slaves for it removed", entry.getClient().getAddr());
}
}

@ -67,8 +67,6 @@ public interface ConnectionManager {
IdleConnectionWatcher getConnectionWatcher();
void shutdownAsync(RedisClient client);
int calcSlot(String key);
MasterSlaveServersConfig getConfig();

@ -46,11 +46,11 @@ import org.redisson.config.BaseMasterSlaveServersConfig;
import org.redisson.config.Config;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.TransportMode;
import org.redisson.misc.CountableListener;
import org.redisson.misc.InfinitySemaphoreLatch;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.URIBuilder;
import org.redisson.pubsub.AsyncSemaphore;
import org.redisson.pubsub.PublishSubscribeService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -435,11 +435,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return client;
}
@Override
public void shutdownAsync(RedisClient client) {
client.shutdownAsync();
}
@Override
public RedisClient createClient(NodeType type, URI address, int timeout, int commandTimeout, String sslHostname) {
RedisClientConfig redisConfig = createRedisConfig(type, address, timeout, commandTimeout, sslHostname);
@ -632,17 +627,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
if (dnsMonitor != null) {
dnsMonitor.stop();
}
resolverGroup.close();
timer.stop();
shutdownLatch.close();
shutdownPromise.trySuccess(true);
shutdownLatch.awaitUninterruptibly();
for (MasterSlaveEntry entry : getEntrySet()) {
entry.shutdown();
}
if (cfg.getExecutor() == null) {
executor.shutdown();
try {
@ -651,8 +637,20 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
Thread.currentThread().interrupt();
}
}
timer.stop();
resolverGroup.close();
shutdownLatch.close();
shutdownPromise.trySuccess(true);
shutdownLatch.awaitUninterruptibly();
RPromise<Void> result = new RedissonPromise<Void>();
CountableListener<Void> listener = new CountableListener<Void>(result, null, getEntrySet().size());
for (MasterSlaveEntry entry : getEntrySet()) {
entry.shutdownAsync().addListener(listener);
}
result.awaitUninterruptibly(timeout, unit);
if (cfg.getEventLoopGroup() == null) {
group.shutdownGracefully(quietPeriod, timeout, unit).syncUninterruptibly();

@ -139,25 +139,28 @@ public class MasterSlaveEntry {
return;
}
masterEntry = new ClientConnectionsEntry(
client,
config.getMasterConnectionMinimumIdleSize(),
config.getMasterConnectionPoolSize(),
config.getSubscriptionConnectionMinimumIdleSize(),
config.getSubscriptionConnectionPoolSize(),
connectionManager,
NodeType.MASTER);
CountableListener<RedisClient> listener = new CountableListener<RedisClient>(result, client);
RFuture<Void> writeFuture = writeConnectionPool.add(masterEntry);
listener.incCounter();
writeFuture.addListener(listener);
if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
RFuture<Void> pubSubFuture = pubSubConnectionPool.add(masterEntry);
listener.incCounter();
pubSubFuture.addListener(listener);
}
masterEntry = new ClientConnectionsEntry(
client,
config.getMasterConnectionMinimumIdleSize(),
config.getMasterConnectionPoolSize(),
config.getSubscriptionConnectionMinimumIdleSize(),
config.getSubscriptionConnectionPoolSize(),
connectionManager,
NodeType.MASTER);
int counter = 1;
if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
counter++;
}
CountableListener<RedisClient> listener = new CountableListener<RedisClient>(result, client, counter);
RFuture<Void> writeFuture = writeConnectionPool.add(masterEntry);
writeFuture.addListener(listener);
if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
RFuture<Void> pubSubFuture = pubSubConnectionPool.add(masterEntry);
pubSubFuture.addListener(listener);
}
}
});
@ -465,19 +468,22 @@ public class MasterSlaveEntry {
&& slaveBalancer.getAvailableClients() > 1) {
slaveDown(newMasterClient.getAddr(), FreezeReason.SYSTEM);
}
connectionManager.shutdownAsync(oldMaster.getClient());
oldMaster.getClient().shutdownAsync();
log.info("master {} has changed to {}", oldMaster.getClient().getAddr(), masterEntry.getClient().getAddr());
}
});
}
public void shutdownMasterAsync() {
public RFuture<Void> shutdownAsync() {
if (!active.compareAndSet(true, false)) {
return;
return RedissonPromise.<Void>newSucceededFuture(null);
}
connectionManager.shutdownAsync(masterEntry.getClient());
slaveBalancer.shutdownAsync();
RPromise<Void> result = new RedissonPromise<Void>();
CountableListener<Void> listener = new CountableListener<Void>(result, null, 2);
masterEntry.getClient().shutdownAsync().addListener(listener);
slaveBalancer.shutdownAsync().addListener(listener);
return result;
}
public RFuture<RedisConnection> connectionWriteOp(RedisCommand<?> command) {
@ -526,15 +532,6 @@ public class MasterSlaveEntry {
slaveBalancer.returnConnection(connection);
}
public void shutdown() {
if (!active.compareAndSet(true, false)) {
return;
}
masterEntry.getClient().shutdown();
slaveBalancer.shutdown();
}
public void addSlotRange(Integer range) {
slots.add(range);
}

@ -83,19 +83,17 @@ public class LoadBalancerManager {
public RFuture<Void> add(final ClientConnectionsEntry entry) {
RPromise<Void> result = new RedissonPromise<Void>();
CountableListener<Void> listener = new CountableListener<Void>(result, null) {
CountableListener<Void> listener = new CountableListener<Void>(result, null, 2) {
@Override
protected void onSuccess(Void value) {
client2Entry.put(entry.getClient(), entry);
}
client2Entry.put(entry.getClient(), entry);
}
};
RFuture<Void> slaveFuture = slaveConnectionPool.add(entry);
listener.incCounter();
slaveFuture.addListener(listener);
RFuture<Void> pubSubFuture = pubSubConnectionPool.add(entry);
listener.incCounter();
pubSubFuture.addListener(listener);
return result;
}
@ -249,16 +247,16 @@ public class LoadBalancerManager {
slaveConnectionPool.returnConnection(entry, connection);
}
public void shutdown() {
for (ClientConnectionsEntry entry : client2Entry.values()) {
entry.getClient().shutdown();
public RFuture<Void> shutdownAsync() {
if (client2Entry.values().isEmpty()) {
return RedissonPromise.<Void>newSucceededFuture(null);
}
}
public void shutdownAsync() {
RPromise<Void> result = new RedissonPromise<Void>();
CountableListener<Void> listener = new CountableListener<Void>(result, null, client2Entry.values().size());
for (ClientConnectionsEntry entry : client2Entry.values()) {
connectionManager.shutdownAsync(entry.getClient());
entry.getClient().shutdownAsync().addListener(listener);
}
return result;
}
}

@ -36,19 +36,19 @@ public class CountableListener<T> implements FutureListener<Object> {
}
public CountableListener(RPromise<T> result, T value) {
super();
this(null, null, 0);
}
public CountableListener(RPromise<T> result, T value, int count) {
this.result = result;
this.value = value;
this.counter.set(count);
}
public void setCounter(int newValue) {
counter.set(newValue);
}
public void incCounter() {
counter.incrementAndGet();
}
public void decCounter() {
if (counter.decrementAndGet() == 0) {
onSuccess(value);

@ -470,11 +470,9 @@ public class RedissonTransaction implements RTransaction {
}
final CountableListener<Map<HashKey, HashValue>> listener =
new CountableListener<Map<HashKey, HashValue>>(result, hashes);
listener.setCounter(hashes.size());
new CountableListener<Map<HashKey, HashValue>>(result, hashes, hashes.size());
RPromise<Void> subscriptionFuture = new RedissonPromise<Void>();
final CountableListener<Void> subscribedFutures = new CountableListener<Void>(subscriptionFuture, null);
subscribedFutures.setCounter(hashes.size());
final CountableListener<Void> subscribedFutures = new CountableListener<Void>(subscriptionFuture, null, hashes.size());
final List<RTopic<Object>> topics = new ArrayList<RTopic<Object>>();
for (final Entry<HashKey, HashValue> entry : hashes.entrySet()) {

Loading…
Cancel
Save