refactoring

pull/4031/head
Nikita Koksharov 3 years ago
parent c916686a61
commit b318f3c237

@ -15,27 +15,8 @@
*/
package org.redisson.client;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.RFuture;
import org.redisson.client.handler.RedisChannelInitializer;
import org.redisson.client.handler.RedisChannelInitializer.Type;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedisURI;
import org.redisson.misc.RedissonPromise;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.*;
import io.netty.channel.epoll.EpollDatagramChannel;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.group.ChannelGroup;
@ -51,6 +32,18 @@ import io.netty.util.NetUtil;
import io.netty.util.Timer;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import org.redisson.api.RFuture;
import org.redisson.client.handler.RedisChannelInitializer;
import org.redisson.client.handler.RedisChannelInitializer.Type;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedisURI;
import org.redisson.misc.RedissonPromise;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
/**
* Low-level Redis client
@ -321,12 +314,19 @@ public final class RedisClient {
}
public void shutdown() {
shutdownAsync().syncUninterruptibly();
shutdownAsync().join();
try {
shutdownAsync().get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
throw (RuntimeException)(e.getCause());
}
}
public RFuture<Void> shutdownAsync() {
public CompletableFuture<Void> shutdownAsync() {
shutdown = true;
RPromise<Void> result = new RedissonPromise<Void>();
CompletableFuture<Void> result = new CompletableFuture<>();
if (channels.isEmpty() || config.getGroup().isShuttingDown()) {
shutdown(result);
return result;
@ -337,7 +337,7 @@ public final class RedisClient {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(future.cause());
result.completeExceptionally(future.cause());
return;
}
@ -359,9 +359,9 @@ public final class RedisClient {
return shutdown;
}
private void shutdown(RPromise<Void> result) {
private void shutdown(CompletableFuture<Void> result) {
if (!hasOwnTimer && !hasOwnExecutor && !hasOwnResolver && !hasOwnGroup) {
result.trySuccess(null);
result.complete(null);
} else {
Thread t = new Thread() {
@Override
@ -383,11 +383,11 @@ public final class RedisClient {
bootstrap.config().group().shutdownGracefully();
}
} catch (Exception e) {
result.tryFailure(e);
result.completeExceptionally(e);
return;
}
result.trySuccess(null);
result.complete(null);
}
};
t.start();

@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
import java.util.Deque;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
@ -118,7 +119,7 @@ public class ClientConnectionsEntry {
firstFailTime.compareAndSet(0, System.currentTimeMillis());
}
public RFuture<Void> shutdownAsync() {
public CompletableFuture<Void> shutdownAsync() {
connectionManager.getConnectionWatcher().remove(this);
return client.shutdownAsync();
}

@ -30,11 +30,12 @@ import io.netty.resolver.AddressResolver;
import io.netty.resolver.AddressResolverGroup;
import io.netty.resolver.DefaultAddressResolverGroup;
import io.netty.resolver.dns.DnsServerAddressStreamProviders;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import io.netty.util.*;
import io.netty.util.concurrent.*;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.*;
import io.netty.util.internal.PlatformDependent;
import org.redisson.ElementsSubscribeService;
import org.redisson.Version;
@ -45,7 +46,10 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.config.*;
import org.redisson.misc.*;
import org.redisson.misc.InfinitySemaphoreLatch;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedisURI;
import org.redisson.misc.RedissonPromise;
import org.redisson.pubsub.PublishSubscribeService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -210,7 +214,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected void closeNodeConnections() {
nodeConnections.values().stream()
.map(c -> c.getRedisClient().shutdownAsync())
.forEach(f -> f.syncUninterruptibly());
.forEach(f -> f.join());
}
protected void closeNodeConnection(RedisConnection conn) {

@ -30,7 +30,10 @@ import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.balancer.LoadBalancerManager;
import org.redisson.connection.pool.MasterConnectionPool;
import org.redisson.connection.pool.MasterPubSubConnectionPool;
import org.redisson.misc.*;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedisURI;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.TransferListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -487,9 +490,9 @@ public class MasterSlaveEntry {
List<CompletableFuture<Void>> futures = new ArrayList<>();
if (masterEntry != null) {
futures.add(masterEntry.shutdownAsync().toCompletableFuture());
futures.add(masterEntry.shutdownAsync());
}
futures.add(slaveBalancer.shutdownAsync().toCompletableFuture());
futures.add(slaveBalancer.shutdownAsync());
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}

@ -714,7 +714,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
sentinels.values().stream()
.map(s -> s.shutdownAsync())
.forEach(f -> f.syncUninterruptibly());
.forEach(f -> f.join());
super.shutdown();
}

@ -15,14 +15,6 @@
*/
package org.redisson.connection.balancer;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import org.redisson.api.NodeType;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
@ -42,6 +34,13 @@ import org.redisson.misc.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
/**
*
* @author Nikita Koksharov
@ -285,16 +284,16 @@ public class LoadBalancerManager {
slaveConnectionPool.returnConnection(entry, connection);
}
public RFuture<Void> shutdownAsync() {
public CompletableFuture<Void> shutdownAsync() {
if (client2Entry.values().isEmpty()) {
return RedissonPromise.<Void>newSucceededFuture(null);
return CompletableFuture.completedFuture(null);
}
RPromise<Void> result = new RedissonPromise<Void>();
CountableListener<Void> listener = new CountableListener<Void>(result, null, client2Entry.values().size());
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (ClientConnectionsEntry entry : client2Entry.values()) {
entry.shutdownAsync().onComplete(listener);
futures.add(entry.shutdownAsync());
}
return result;
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}
}

Loading…
Cancel
Save