diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index d2ad9c769..e23ac43e3 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -147,7 +147,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { private final ExecutorService executor; private final CommandSyncService commandExecutor; - + private final Config cfg; protected final DnsAddressResolverGroup resolverGroup; @@ -186,7 +186,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } else if (cfg.getTransportMode() == TransportMode.KQUEUE) { if (cfg.getEventLoopGroup() == null) { this.group = new KQueueEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty")); - } else { + } else { this.group = cfg.getEventLoopGroup(); } @@ -218,7 +218,24 @@ public class MasterSlaveConnectionManager implements ConnectionManager { this.shutdownPromise = new RedissonPromise(); this.commandExecutor = new CommandSyncService(this); } + + /* + * Remove it once https://github.com/netty/netty/issues/7882 get resolved + */ + protected DnsAddressResolverGroup createResolverGroup() { + if (cfg.getTransportMode() == TransportMode.EPOLL) { + return cfg.getAddressResolverGroupFactory().create(EpollDatagramChannel.class, DnsServerAddressStreamProviders.platformDefault()); + } else if (cfg.getTransportMode() == TransportMode.KQUEUE) { + return cfg.getAddressResolverGroupFactory().create(KQueueDatagramChannel.class, DnsServerAddressStreamProviders.platformDefault()); + } + + return cfg.getAddressResolverGroupFactory().create(NioDatagramChannel.class, DnsServerAddressStreamProviders.platformDefault()); + } + public SlotGenerator getSlotGenerator() { + return slotGenerator; + } + protected void closeNodeConnections() { List> futures = new ArrayList>(); for (RedisConnection connection : nodeConnections.values()) { @@ -351,7 +368,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } RFuture f = entry.setupMasterEntry(config.getMasterAddress()); f.syncUninterruptibly(); - + for (int slot = singleSlotRange.getStartSlot(); slot < singleSlotRange.getEndSlot() + 1; slot++) { addEntry(slot, entry); } @@ -499,14 +516,14 @@ public class MasterSlaveConnectionManager implements ConnectionManager { for (MasterSlaveEntry entry : client2entry.values()) { if (URIBuilder.compare(entry.getClient().getAddr(), addr)) { return entry; - } + } if (entry.hasSlave(addr)) { return entry; } } return null; } - + @Override public MasterSlaveEntry getEntry(RedisClient redisClient) { MasterSlaveEntry entry = client2entry.get(redisClient); @@ -636,7 +653,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { for (MasterSlaveEntry entry : getEntrySet()) { entry.shutdown(); } - + if (cfg.getExecutor() == null) { executor.shutdown(); try { diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index b1155f0a9..119addfff 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -115,7 +115,7 @@ public class MasterSlaveEntry { } return result; } - + public RFuture setupMasterEntry(InetSocketAddress address, URI uri) { RedisClient client = connectionManager.createClient(NodeType.MASTER, address, uri, sslHostname); return setupMasterEntry(client); @@ -139,25 +139,25 @@ public class MasterSlaveEntry { return; } - masterEntry = new ClientConnectionsEntry( - client, - config.getMasterConnectionMinimumIdleSize(), - config.getMasterConnectionPoolSize(), - config.getSubscriptionConnectionMinimumIdleSize(), - config.getSubscriptionConnectionPoolSize(), - connectionManager, + masterEntry = new ClientConnectionsEntry( + client, + config.getMasterConnectionMinimumIdleSize(), + config.getMasterConnectionPoolSize(), + config.getSubscriptionConnectionMinimumIdleSize(), + config.getSubscriptionConnectionPoolSize(), + connectionManager, NodeType.MASTER); - - CountableListener listener = new CountableListener(result, client); - RFuture writeFuture = writeConnectionPool.add(masterEntry); - listener.incCounter(); - writeFuture.addListener(listener); - - if (config.getSubscriptionMode() == SubscriptionMode.MASTER) { - RFuture pubSubFuture = pubSubConnectionPool.add(masterEntry); - listener.incCounter(); - pubSubFuture.addListener(listener); - } + + CountableListener listener = new CountableListener(result, client); + RFuture writeFuture = writeConnectionPool.add(masterEntry); + listener.incCounter(); + writeFuture.addListener(listener); + + if (config.getSubscriptionMode() == SubscriptionMode.MASTER) { + RFuture pubSubFuture = pubSubConnectionPool.add(masterEntry); + listener.incCounter(); + pubSubFuture.addListener(listener); + } } }); @@ -306,7 +306,7 @@ public class MasterSlaveEntry { public RFuture addSlave(InetSocketAddress address, URI uri) { return addSlave(address, uri, false, NodeType.SLAVE); } - + private RFuture addSlave(final RedisClient client, final boolean freezed, final NodeType nodeType) { final RPromise result = new RedissonPromise(); RFuture addrFuture = client.resolveAddr(); @@ -317,7 +317,7 @@ public class MasterSlaveEntry { result.tryFailure(future.cause()); return; } - + ClientConnectionsEntry entry = new ClientConnectionsEntry(client, config.getSlaveConnectionMinimumIdleSize(), config.getSlaveConnectionPoolSize(), @@ -335,7 +335,7 @@ public class MasterSlaveEntry { }); return result; } - + private RFuture addSlave(InetSocketAddress address, URI uri, final boolean freezed, final NodeType nodeType) { RedisClient client = connectionManager.createClient(NodeType.SLAVE, address, uri, sslHostname); return addSlave(client, freezed, nodeType); @@ -346,6 +346,10 @@ public class MasterSlaveEntry { return addSlave(client, freezed, nodeType); } + public ClientConnectionsEntry getSlaveEntry(RedisClient client) { + return slaveBalancer.getEntry(client); + } + public Collection getSlaveEntries() { List result = new ArrayList(); for (ClientConnectionsEntry slaveEntry : slaveBalancer.getEntries()) { diff --git a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java index d4d14f0d0..af5f51597 100644 --- a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java +++ b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java @@ -86,8 +86,8 @@ public class LoadBalancerManager { CountableListener listener = new CountableListener(result, null) { @Override protected void onSuccess(Void value) { - client2Entry.put(entry.getClient(), entry); - } + client2Entry.put(entry.getClient(), entry); + } }; RFuture slaveFuture = slaveConnectionPool.add(entry); @@ -149,12 +149,12 @@ public class LoadBalancerManager { } return false; } - + public ClientConnectionsEntry freeze(URI address, FreezeReason freezeReason) { ClientConnectionsEntry connectionEntry = getEntry(address); return freeze(connectionEntry, freezeReason); } - + public ClientConnectionsEntry freeze(InetSocketAddress address, FreezeReason freezeReason) { ClientConnectionsEntry connectionEntry = getEntry(address); return freeze(connectionEntry, freezeReason); @@ -197,7 +197,7 @@ public class LoadBalancerManager { public boolean contains(URI addr) { return getEntry(addr) != null; } - + public boolean contains(RedisClient redisClient) { return getEntry(redisClient) != null; } @@ -222,8 +222,7 @@ public class LoadBalancerManager { return null; } - - protected ClientConnectionsEntry getEntry(RedisClient redisClient) { + public ClientConnectionsEntry getEntry(RedisClient redisClient) { return client2Entry.get(redisClient); }