refactoring

pull/4031/head
Nikita Koksharov 3 years ago
parent 1f539dcace
commit 7e5365a501

@ -34,6 +34,7 @@ import io.netty.util.Timer;
import io.netty.util.TimerTask; import io.netty.util.TimerTask;
import io.netty.util.*; import io.netty.util.*;
import io.netty.util.concurrent.*; import io.netty.util.concurrent.*;
import io.netty.util.concurrent.Future;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import org.redisson.ElementsSubscribeService; import org.redisson.ElementsSubscribeService;
import org.redisson.Version; import org.redisson.Version;
@ -51,10 +52,7 @@ import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -598,13 +596,17 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
connectionWatcher.stop(); connectionWatcher.stop();
RPromise<Void> result = new RedissonPromise<Void>(); List<CompletableFuture<Void>> futures = new ArrayList<>();
CountableListener<Void> listener = new CountableListener<Void>(result, null, getEntrySet().size());
for (MasterSlaveEntry entry : getEntrySet()) { for (MasterSlaveEntry entry : getEntrySet()) {
entry.shutdownAsync().onComplete(listener); futures.add(entry.shutdownAsync());
}
CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
try {
future.get(timeout, unit);
} catch (Exception e) {
// skip
} }
result.awaitUninterruptibly(timeout, unit);
resolverGroup.close(); resolverGroup.close();
shutdownLatch.close(); shutdownLatch.close();

@ -480,18 +480,18 @@ public class MasterSlaveEntry {
}); });
} }
public RFuture<Void> shutdownAsync() { public CompletableFuture<Void> shutdownAsync() {
if (!active.compareAndSet(true, false)) { if (!active.compareAndSet(true, false)) {
return RedissonPromise.<Void>newSucceededFuture(null); return CompletableFuture.completedFuture(null);
} }
RPromise<Void> result = new RedissonPromise<Void>(); List<CompletableFuture<Void>> futures = new ArrayList<>();
CountableListener<Void> listener = new CountableListener<Void>(result, null, 2);
if (masterEntry != null) { if (masterEntry != null) {
masterEntry.shutdownAsync().onComplete(listener); futures.add(masterEntry.shutdownAsync().toCompletableFuture());
} }
slaveBalancer.shutdownAsync().onComplete(listener); futures.add(slaveBalancer.shutdownAsync().toCompletableFuture());
return result;
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
} }
public RFuture<RedisConnection> connectionWriteOp(RedisCommand<?> command) { public RFuture<RedisConnection> connectionWriteOp(RedisCommand<?> command) {

Loading…
Cancel
Save