RemoteSerivce shutdown process optimization. #446

pull/499/head
Nikita 9 years ago
parent 7884c07b09
commit 5e2cbe6a2f

@ -87,7 +87,6 @@ public class Redisson implements RedissonClient {
private final CommandExecutor commandExecutor;
private final ConnectionManager connectionManager;
private final Config config;
private final RedissonRemoteService remoteService;
private final UUID id = UUID.randomUUID();
@ -115,7 +114,6 @@ public class Redisson implements RedissonClient {
}
commandExecutor = new CommandSyncService(connectionManager);
evictionScheduler = new EvictionScheduler(commandExecutor);
remoteService = new RedissonRemoteService(this);
}
private void validate(SingleServerConfig config) {
@ -372,7 +370,7 @@ public class Redisson implements RedissonClient {
}
public RRemoteService getRemoteSerivce() {
return remoteService;
return new RedissonRemoteService(this);
}
@Override
@ -507,7 +505,6 @@ public class Redisson implements RedissonClient {
@Override
public void shutdown() {
remoteService.shutdown();
connectionManager.shutdown();
}

@ -19,8 +19,6 @@ import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -43,7 +41,6 @@ public class RedissonRemoteService implements RRemoteService {
private static final Logger log = LoggerFactory.getLogger(RedissonRemoteService.class);
private final Map<RemoteServiceKey, RemoteServiceMethod> beans = PlatformDependent.newConcurrentHashMap();
private final Queue<Future<RemoteServiceRequest>> futures = new ConcurrentLinkedQueue<Future<RemoteServiceRequest>>();
private final Redisson redisson;
@ -78,7 +75,6 @@ public class RedissonRemoteService implements RRemoteService {
private <T> void subscribe(final Class<T> remoteInterface, final RBlockingQueue<RemoteServiceRequest> requestQueue) {
Future<RemoteServiceRequest> take = requestQueue.takeAsync();
futures.add(take);
take.addListener(new FutureListener<RemoteServiceRequest>() {
@Override
public void operationComplete(Future<RemoteServiceRequest> future) throws Exception {
@ -104,7 +100,6 @@ public class RedissonRemoteService implements RRemoteService {
log.error("None of clients has not received a response for: {}", request);
}
futures.remove(future);
subscribe(remoteInterface, requestQueue);
}
});
@ -161,10 +156,4 @@ public class RedissonRemoteService implements RRemoteService {
return ByteBufUtil.hexDump(id);
}
public void shutdown() {
for (Future<RemoteServiceRequest> future : futures) {
future.cancel(true);
}
}
}

@ -462,18 +462,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
int timeoutTime = connectionManager.getConfig().getTimeout();
if (skipTimeout.contains(details.getCommand().getName())) {
Integer popTimeout = Integer.valueOf(details.getParams()[details.getParams().length - 1].toString());
details.getMainPromise().addListener(new FutureListener<R>() {
@Override
public void operationComplete(Future<R> future) throws Exception {
if (!future.isCancelled()) {
return;
}
// cancel handling for commands from skipTimeout collection
if (details.getAttemptPromise().cancel(true)) {
connection.forceReconnectAsync();
}
}
});
handleBlockingOperations(details, connection);
if (popTimeout == 0) {
return;
}
@ -494,6 +483,29 @@ public class CommandAsyncService implements CommandAsyncExecutor {
details.setTimeout(timeout);
}
private <R, V> void handleBlockingOperations(final AsyncDetails<V, R> details, final RedisConnection connection) {
final FutureListener<Boolean> listener = new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
details.getMainPromise().cancel(true);
}
};
details.getMainPromise().addListener(new FutureListener<R>() {
@Override
public void operationComplete(Future<R> future) throws Exception {
if (!future.isCancelled()) {
connectionManager.getShutdownPromise().removeListener(listener);
return;
}
// cancel handling for commands from skipTimeout collection
if (details.getAttemptPromise().cancel(true)) {
connection.forceReconnectAsync();
}
}
});
connectionManager.getShutdownPromise().addListener(listener);
}
private <R, V> void checkConnectionFuture(final NodeSource source,
final AsyncDetails<V, R> details) {
if (details.getAttemptPromise().isDone() || details.getMainPromise().isCancelled() || details.getConnectionFuture().isCancelled()) {

@ -107,5 +107,7 @@ public interface ConnectionManager {
Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
InfinitySemaphoreLatch getShutdownLatch();
Future<Boolean> getShutdownPromise();
}

@ -122,6 +122,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected final Map<ClusterSlotRange, MasterSlaveEntry> entries = PlatformDependent.newConcurrentHashMap();
private final Promise<Boolean> shutdownPromise;
private final InfinitySemaphoreLatch shutdownLatch = new InfinitySemaphoreLatch();
private final Set<RedisClientEntry> clients = Collections.newSetFromMap(PlatformDependent.<RedisClientEntry, Boolean>newConcurrentHashMap());
@ -156,6 +158,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
this.socketChannelClass = NioSocketChannel.class;
}
this.codec = cfg.getCodec();
this.shutdownPromise = group.next().newPromise();
this.isClusterMode = cfg.isClusterConfig();
}
@ -674,6 +677,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override
public void shutdown() {
shutdownPromise.trySuccess(true);
shutdownLatch.closeAndAwaitUninterruptibly();
for (MasterSlaveEntry entry : entries.values()) {
entry.shutdown();
@ -731,6 +735,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
public InfinitySemaphoreLatch getShutdownLatch() {
return shutdownLatch;
}
@Override
public Future<Boolean> getShutdownPromise() {
return shutdownPromise;
}
@Override
public ConnectionEventsHub getConnectionEventsHub() {

Loading…
Cancel
Save