refactoring

pull/1423/head
Nikita 7 years ago
parent 5eaab6e37f
commit 20323d8132

@ -147,7 +147,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
private final ExecutorService executor;
private final CommandSyncService commandExecutor;
private final Config cfg;
protected final DnsAddressResolverGroup resolverGroup;
@ -186,7 +186,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
} else if (cfg.getTransportMode() == TransportMode.KQUEUE) {
if (cfg.getEventLoopGroup() == null) {
this.group = new KQueueEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty"));
} else {
} else {
this.group = cfg.getEventLoopGroup();
}
@ -218,7 +218,24 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
this.shutdownPromise = new RedissonPromise<Boolean>();
this.commandExecutor = new CommandSyncService(this);
}
/*
* Remove it once https://github.com/netty/netty/issues/7882 get resolved
*/
protected DnsAddressResolverGroup createResolverGroup() {
if (cfg.getTransportMode() == TransportMode.EPOLL) {
return cfg.getAddressResolverGroupFactory().create(EpollDatagramChannel.class, DnsServerAddressStreamProviders.platformDefault());
} else if (cfg.getTransportMode() == TransportMode.KQUEUE) {
return cfg.getAddressResolverGroupFactory().create(KQueueDatagramChannel.class, DnsServerAddressStreamProviders.platformDefault());
}
return cfg.getAddressResolverGroupFactory().create(NioDatagramChannel.class, DnsServerAddressStreamProviders.platformDefault());
}
public SlotGenerator getSlotGenerator() {
return slotGenerator;
}
protected void closeNodeConnections() {
List<RFuture<Void>> futures = new ArrayList<RFuture<Void>>();
for (RedisConnection connection : nodeConnections.values()) {
@ -351,7 +368,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
RFuture<RedisClient> f = entry.setupMasterEntry(config.getMasterAddress());
f.syncUninterruptibly();
for (int slot = singleSlotRange.getStartSlot(); slot < singleSlotRange.getEndSlot() + 1; slot++) {
addEntry(slot, entry);
}
@ -499,14 +516,14 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
for (MasterSlaveEntry entry : client2entry.values()) {
if (URIBuilder.compare(entry.getClient().getAddr(), addr)) {
return entry;
}
}
if (entry.hasSlave(addr)) {
return entry;
}
}
return null;
}
@Override
public MasterSlaveEntry getEntry(RedisClient redisClient) {
MasterSlaveEntry entry = client2entry.get(redisClient);
@ -636,7 +653,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
for (MasterSlaveEntry entry : getEntrySet()) {
entry.shutdown();
}
if (cfg.getExecutor() == null) {
executor.shutdown();
try {

@ -115,7 +115,7 @@ public class MasterSlaveEntry {
}
return result;
}
public RFuture<RedisClient> setupMasterEntry(InetSocketAddress address, URI uri) {
RedisClient client = connectionManager.createClient(NodeType.MASTER, address, uri, sslHostname);
return setupMasterEntry(client);
@ -139,25 +139,25 @@ public class MasterSlaveEntry {
return;
}
masterEntry = new ClientConnectionsEntry(
client,
config.getMasterConnectionMinimumIdleSize(),
config.getMasterConnectionPoolSize(),
config.getSubscriptionConnectionMinimumIdleSize(),
config.getSubscriptionConnectionPoolSize(),
connectionManager,
masterEntry = new ClientConnectionsEntry(
client,
config.getMasterConnectionMinimumIdleSize(),
config.getMasterConnectionPoolSize(),
config.getSubscriptionConnectionMinimumIdleSize(),
config.getSubscriptionConnectionPoolSize(),
connectionManager,
NodeType.MASTER);
CountableListener<RedisClient> listener = new CountableListener<RedisClient>(result, client);
RFuture<Void> writeFuture = writeConnectionPool.add(masterEntry);
listener.incCounter();
writeFuture.addListener(listener);
if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
RFuture<Void> pubSubFuture = pubSubConnectionPool.add(masterEntry);
listener.incCounter();
pubSubFuture.addListener(listener);
}
CountableListener<RedisClient> listener = new CountableListener<RedisClient>(result, client);
RFuture<Void> writeFuture = writeConnectionPool.add(masterEntry);
listener.incCounter();
writeFuture.addListener(listener);
if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
RFuture<Void> pubSubFuture = pubSubConnectionPool.add(masterEntry);
listener.incCounter();
pubSubFuture.addListener(listener);
}
}
});
@ -306,7 +306,7 @@ public class MasterSlaveEntry {
public RFuture<Void> addSlave(InetSocketAddress address, URI uri) {
return addSlave(address, uri, false, NodeType.SLAVE);
}
private RFuture<Void> addSlave(final RedisClient client, final boolean freezed, final NodeType nodeType) {
final RPromise<Void> result = new RedissonPromise<Void>();
RFuture<InetSocketAddress> addrFuture = client.resolveAddr();
@ -317,7 +317,7 @@ public class MasterSlaveEntry {
result.tryFailure(future.cause());
return;
}
ClientConnectionsEntry entry = new ClientConnectionsEntry(client,
config.getSlaveConnectionMinimumIdleSize(),
config.getSlaveConnectionPoolSize(),
@ -335,7 +335,7 @@ public class MasterSlaveEntry {
});
return result;
}
private RFuture<Void> addSlave(InetSocketAddress address, URI uri, final boolean freezed, final NodeType nodeType) {
RedisClient client = connectionManager.createClient(NodeType.SLAVE, address, uri, sslHostname);
return addSlave(client, freezed, nodeType);
@ -346,6 +346,10 @@ public class MasterSlaveEntry {
return addSlave(client, freezed, nodeType);
}
public ClientConnectionsEntry getSlaveEntry(RedisClient client) {
return slaveBalancer.getEntry(client);
}
public Collection<ClientConnectionsEntry> getSlaveEntries() {
List<ClientConnectionsEntry> result = new ArrayList<ClientConnectionsEntry>();
for (ClientConnectionsEntry slaveEntry : slaveBalancer.getEntries()) {

@ -86,8 +86,8 @@ public class LoadBalancerManager {
CountableListener<Void> listener = new CountableListener<Void>(result, null) {
@Override
protected void onSuccess(Void value) {
client2Entry.put(entry.getClient(), entry);
}
client2Entry.put(entry.getClient(), entry);
}
};
RFuture<Void> slaveFuture = slaveConnectionPool.add(entry);
@ -149,12 +149,12 @@ public class LoadBalancerManager {
}
return false;
}
public ClientConnectionsEntry freeze(URI address, FreezeReason freezeReason) {
ClientConnectionsEntry connectionEntry = getEntry(address);
return freeze(connectionEntry, freezeReason);
}
public ClientConnectionsEntry freeze(InetSocketAddress address, FreezeReason freezeReason) {
ClientConnectionsEntry connectionEntry = getEntry(address);
return freeze(connectionEntry, freezeReason);
@ -197,7 +197,7 @@ public class LoadBalancerManager {
public boolean contains(URI addr) {
return getEntry(addr) != null;
}
public boolean contains(RedisClient redisClient) {
return getEntry(redisClient) != null;
}
@ -222,8 +222,7 @@ public class LoadBalancerManager {
return null;
}
protected ClientConnectionsEntry getEntry(RedisClient redisClient) {
public ClientConnectionsEntry getEntry(RedisClient redisClient) {
return client2Entry.get(redisClient);
}

Loading…
Cancel
Save