From b0381de67aa9a92b727cb616bc00f91145aa63fc Mon Sep 17 00:00:00 2001 From: Faisal Ahmed Date: Mon, 12 Mar 2018 10:04:47 +0900 Subject: [PATCH 1/4] fixed missing colon problem in user defined key prefix. --- .../main/java/org/redisson/tomcat/RedissonSessionManager.java | 3 ++- .../main/java/org/redisson/tomcat/RedissonSessionManager.java | 3 ++- .../main/java/org/redisson/tomcat/RedissonSessionManager.java | 3 ++- .../main/java/org/redisson/tomcat/RedissonSessionManager.java | 3 ++- 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/redisson-tomcat/redisson-tomcat-6/src/main/java/org/redisson/tomcat/RedissonSessionManager.java b/redisson-tomcat/redisson-tomcat-6/src/main/java/org/redisson/tomcat/RedissonSessionManager.java index 585df5ce7..29dcba552 100644 --- a/redisson-tomcat/redisson-tomcat-6/src/main/java/org/redisson/tomcat/RedissonSessionManager.java +++ b/redisson-tomcat/redisson-tomcat-6/src/main/java/org/redisson/tomcat/RedissonSessionManager.java @@ -140,7 +140,8 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle { } public RMap getMap(String sessionId) { - return redisson.getMap(keyPrefix + "redisson_tomcat_session:" + sessionId); + String separator = keyPrefix == null || keyPrefix.isEmpty() ? "" : ":"; + return redisson.getMap(keyPrefix + separator + "redisson_tomcat_session:" + sessionId); } @Override diff --git a/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/RedissonSessionManager.java b/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/RedissonSessionManager.java index 66dd91d79..691caf322 100644 --- a/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/RedissonSessionManager.java +++ b/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/RedissonSessionManager.java @@ -120,7 +120,8 @@ public class RedissonSessionManager extends ManagerBase { } public RMap getMap(String sessionId) { - return redisson.getMap(keyPrefix + "redisson_tomcat_session:" + sessionId); + String separator = keyPrefix == null || keyPrefix.isEmpty() ? "" : ":"; + return redisson.getMap(keyPrefix + separator + "redisson_tomcat_session:" + sessionId); } @Override diff --git a/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/RedissonSessionManager.java b/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/RedissonSessionManager.java index 35d4d84f9..e710753c9 100644 --- a/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/RedissonSessionManager.java +++ b/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/RedissonSessionManager.java @@ -119,7 +119,8 @@ public class RedissonSessionManager extends ManagerBase { } public RMap getMap(String sessionId) { - return redisson.getMap(keyPrefix + "redisson_tomcat_session:" + sessionId); + String separator = keyPrefix == null || keyPrefix.isEmpty() ? "" : ":"; + return redisson.getMap(keyPrefix + separator + "redisson_tomcat_session:" + sessionId); } @Override diff --git a/redisson-tomcat/redisson-tomcat-9/src/main/java/org/redisson/tomcat/RedissonSessionManager.java b/redisson-tomcat/redisson-tomcat-9/src/main/java/org/redisson/tomcat/RedissonSessionManager.java index b370552f5..b8da6512a 100644 --- a/redisson-tomcat/redisson-tomcat-9/src/main/java/org/redisson/tomcat/RedissonSessionManager.java +++ b/redisson-tomcat/redisson-tomcat-9/src/main/java/org/redisson/tomcat/RedissonSessionManager.java @@ -119,7 +119,8 @@ public class RedissonSessionManager extends ManagerBase { } public RMap getMap(String sessionId) { - return redisson.getMap(keyPrefix + "redisson_tomcat_session:" + sessionId); + String separator = keyPrefix == null || keyPrefix.isEmpty() ? "" : ":"; + return redisson.getMap(keyPrefix + separator + "redisson_tomcat_session:" + sessionId); } @Override From 9a179e14cded9e20f081ce479e47af49a02f45af Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 13 Mar 2018 12:24:20 +0300 Subject: [PATCH 2/4] Fixed - CertificateException while connecting to Azure or AWS Elasticache config endpoint. #1296 --- .../redisson/client/RedisClientConfig.java | 10 +++++++ .../handler/RedisChannelInitializer.java | 8 +++++- .../cluster/ClusterConnectionManager.java | 17 +++++++---- .../connection/ConnectionManager.java | 6 ++-- .../MasterSlaveConnectionManager.java | 23 +++++++-------- .../redisson/connection/MasterSlaveEntry.java | 15 +++++++--- .../ReplicatedConnectionManager.java | 4 +-- .../connection/SentinelConnectionManager.java | 6 ++-- .../java/org/redisson/misc/URIBuilder.java | 28 +++++++++++++------ 9 files changed, 79 insertions(+), 38 deletions(-) diff --git a/redisson/src/main/java/org/redisson/client/RedisClientConfig.java b/redisson/src/main/java/org/redisson/client/RedisClientConfig.java index a9393e433..b90f3f821 100644 --- a/redisson/src/main/java/org/redisson/client/RedisClientConfig.java +++ b/redisson/src/main/java/org/redisson/client/RedisClientConfig.java @@ -55,6 +55,7 @@ public class RedisClientConfig { private boolean keepAlive; private boolean tcpNoDelay; + private String sslHostname; private boolean sslEnableEndpointIdentification = true; private SslProvider sslProvider = SslProvider.JDK; private URI sslTruststore; @@ -89,8 +90,17 @@ public class RedisClientConfig { this.sslKeystore = config.sslKeystore; this.sslKeystorePassword = config.sslKeystorePassword; this.resolverGroup = config.resolverGroup; + this.sslHostname = config.sslHostname; } + public String getSslHostname() { + return sslHostname; + } + public RedisClientConfig setSslHostname(String sslHostname) { + this.sslHostname = sslHostname; + return this; + } + public RedisClientConfig setAddress(String host, int port) { this.address = URIBuilder.create("redis://" + host + ":" + port); return this; diff --git a/redisson/src/main/java/org/redisson/client/handler/RedisChannelInitializer.java b/redisson/src/main/java/org/redisson/client/handler/RedisChannelInitializer.java index 39bd51c25..9645f6249 100644 --- a/redisson/src/main/java/org/redisson/client/handler/RedisChannelInitializer.java +++ b/redisson/src/main/java/org/redisson/client/handler/RedisChannelInitializer.java @@ -33,6 +33,7 @@ import org.redisson.client.RedisClient; import org.redisson.client.RedisClientConfig; import org.redisson.client.RedisConnection; import org.redisson.config.SslProvider; +import org.redisson.misc.URIBuilder; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; @@ -162,7 +163,12 @@ public class RedisChannelInitializer extends ChannelInitializer { } SslContext sslContext = sslContextBuilder.build(); - SSLEngine sslEngine = sslContext.newEngine(ch.alloc(), config.getAddress().getHost(), config.getAddress().getPort()); + String hostname = config.getSslHostname(); + if (hostname == null || URIBuilder.isValidIP(hostname)) { + hostname = config.getAddress().getHost(); + } + + SSLEngine sslEngine = sslContext.newEngine(ch.alloc(), hostname, config.getAddress().getPort()); sslEngine.setSSLParameters(sslParams); SslHandler sslHandler = new SslHandler(sslEngine); diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index de3fe9aa0..84f774545 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -82,6 +82,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { private RedisStrictCommand> clusterNodesCommand; + private String configEndpointHostName; + private boolean isConfigEndpoint; private AddressResolver resolver; @@ -95,7 +97,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { Throwable lastException = null; List failedMasters = new ArrayList(); for (URI addr : cfg.getNodeAddresses()) { - RFuture connectionFuture = connectToNode(cfg, addr, null); + RFuture connectionFuture = connectToNode(cfg, addr, null, addr.getHost()); try { RedisConnection connection = connectionFuture.syncUninterruptibly().getNow(); @@ -104,6 +106,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { Future> addrsFuture = resolver.resolveAll(InetSocketAddress.createUnresolved(addr.getHost(), addr.getPort())); List allAddrs = addrsFuture.syncUninterruptibly().getNow(); if (allAddrs.size() > 1) { + configEndpointHostName = addr.getHost(); isConfigEndpoint = true; } else { resolver.close(); @@ -178,8 +181,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } @Override - protected RedisClientConfig createRedisConfig(NodeType type, URI address, int timeout, int commandTimeout) { - RedisClientConfig result = super.createRedisConfig(type, address, timeout, commandTimeout); + protected RedisClientConfig createRedisConfig(NodeType type, URI address, int timeout, int commandTimeout, String sslHostname) { + RedisClientConfig result = super.createRedisConfig(type, address, timeout, commandTimeout, sslHostname); result.setReadOnly(type == NodeType.SLAVE && config.getReadMode() != ReadMode.MASTER); return result; } @@ -198,7 +201,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } final RPromise>> result = new RedissonPromise>>(); - RFuture connectionFuture = connectToNode(cfg, partition.getMasterAddress(), null); + RFuture connectionFuture = connectToNode(cfg, partition.getMasterAddress(), null, configEndpointHostName); connectionFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -351,7 +354,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { return; } final URI uri = iterator.next(); - RFuture connectionFuture = connectToNode(cfg, uri, null); + RFuture connectionFuture = connectToNode(cfg, uri, null, configEndpointHostName); connectionFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -651,6 +654,10 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } } } + + public String getConfigEndpointHostName() { + return configEndpointHostName; + } @Override public int calcSlot(String key) { diff --git a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java index d6427cb24..b7ffa7c8e 100644 --- a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java @@ -97,11 +97,11 @@ public interface ConnectionManager { RFuture connectionWriteOp(NodeSource source, RedisCommand command); - RedisClient createClient(NodeType type, URI address, int timeout, int commandTimeout); + RedisClient createClient(NodeType type, URI address, int timeout, int commandTimeout, String sslHostname); - RedisClient createClient(NodeType type, InetSocketAddress address, URI uri); + RedisClient createClient(NodeType type, InetSocketAddress address, URI uri, String sslHostname); - RedisClient createClient(NodeType type, URI address); + RedisClient createClient(NodeType type, URI address, String sslHostname); MasterSlaveEntry getEntry(RedisClient redisClient); diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 10bfdebce..d73dfbc40 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -250,7 +250,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } } - protected RFuture connectToNode(BaseMasterSlaveServersConfig cfg, final URI addr, RedisClient client) { + protected RFuture connectToNode(BaseMasterSlaveServersConfig cfg, final URI addr, RedisClient client, String sslHostname) { final Object key; if (client != null) { key = client; @@ -263,7 +263,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } if (addr != null) { - client = createClient(NodeType.MASTER, addr, cfg.getConnectTimeout(), cfg.getRetryInterval() * cfg.getRetryAttempts()); + client = createClient(NodeType.MASTER, addr, cfg.getConnectTimeout(), cfg.getRetryInterval() * cfg.getRetryAttempts(), sslHostname); } final RPromise result = new RedissonPromise(); RFuture future = client.connectAsync(); @@ -430,15 +430,15 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } @Override - public RedisClient createClient(NodeType type, URI address) { - RedisClient client = createClient(type, address, config.getConnectTimeout(), config.getRetryInterval() * config.getRetryAttempts()); + public RedisClient createClient(NodeType type, URI address, String sslHostname) { + RedisClient client = createClient(type, address, config.getConnectTimeout(), config.getRetryInterval() * config.getRetryAttempts(), sslHostname); clientEntries.put(client, new RedisClientEntry(client, commandExecutor, type)); return client; } @Override - public RedisClient createClient(NodeType type, InetSocketAddress address, URI uri) { - RedisClient client = createClient(type, address, uri, config.getConnectTimeout(), config.getRetryInterval() * config.getRetryAttempts()); + public RedisClient createClient(NodeType type, InetSocketAddress address, URI uri, String sslHostname) { + RedisClient client = createClient(type, address, uri, config.getConnectTimeout(), config.getRetryInterval() * config.getRetryAttempts(), sslHostname); clientEntries.put(client, new RedisClientEntry(client, commandExecutor, type)); return client; } @@ -452,19 +452,19 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } @Override - public RedisClient createClient(NodeType type, URI address, int timeout, int commandTimeout) { - RedisClientConfig redisConfig = createRedisConfig(type, address, timeout, commandTimeout); + public RedisClient createClient(NodeType type, URI address, int timeout, int commandTimeout, String sslHostname) { + RedisClientConfig redisConfig = createRedisConfig(type, address, timeout, commandTimeout, sslHostname); return RedisClient.create(redisConfig); } - private RedisClient createClient(NodeType type, InetSocketAddress address, URI uri, int timeout, int commandTimeout) { - RedisClientConfig redisConfig = createRedisConfig(type, null, timeout, commandTimeout); + private RedisClient createClient(NodeType type, InetSocketAddress address, URI uri, int timeout, int commandTimeout, String sslHostname) { + RedisClientConfig redisConfig = createRedisConfig(type, null, timeout, commandTimeout, sslHostname); redisConfig.setAddress(address, uri); return RedisClient.create(redisConfig); } - protected RedisClientConfig createRedisConfig(NodeType type, URI address, int timeout, int commandTimeout) { + protected RedisClientConfig createRedisConfig(NodeType type, URI address, int timeout, int commandTimeout, String sslHostname) { RedisClientConfig redisConfig = new RedisClientConfig(); redisConfig.setAddress(address) .setTimer(timer) @@ -474,6 +474,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { .setSocketChannelClass(socketChannelClass) .setConnectTimeout(timeout) .setCommandTimeout(commandTimeout) + .setSslHostname(sslHostname) .setSslEnableEndpointIdentification(config.isSslEnableEndpointIdentification()) .setSslProvider(config.getSslProvider()) .setSslTruststore(config.getSslTruststore()) diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index c2d49c4e1..c795b7a90 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -35,6 +35,7 @@ import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.pubsub.PubSubType; +import org.redisson.cluster.ClusterConnectionManager; import org.redisson.cluster.ClusterSlotRange; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.ReadMode; @@ -77,6 +78,8 @@ public class MasterSlaveEntry { final AtomicBoolean active = new AtomicBoolean(true); + String sslHostname; + public MasterSlaveEntry(Set slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config) { for (ClusterSlotRange clusterSlotRange : slotRanges) { for (int i = clusterSlotRange.getStartSlot(); i < clusterSlotRange.getEndSlot() + 1; i++) { @@ -89,6 +92,10 @@ public class MasterSlaveEntry { slaveBalancer = new LoadBalancerManager(config, connectionManager, this); writeConnectionPool = new MasterConnectionPool(config, connectionManager, this); pubSubConnectionPool = new MasterPubSubConnectionPool(config, connectionManager, this); + + if (connectionManager instanceof ClusterConnectionManager) { + sslHostname = ((ClusterConnectionManager) connectionManager).getConfigEndpointHostName(); + } } public MasterSlaveServersConfig getConfig() { @@ -111,13 +118,13 @@ public class MasterSlaveEntry { } public RFuture setupMasterEntry(InetSocketAddress address, URI uri) { - RedisClient client = connectionManager.createClient(NodeType.MASTER, address, uri); + RedisClient client = connectionManager.createClient(NodeType.MASTER, address, uri, sslHostname); return setupMasterEntry(client); } public RFuture setupMasterEntry(URI address) { - RedisClient client = connectionManager.createClient(NodeType.MASTER, address); + RedisClient client = connectionManager.createClient(NodeType.MASTER, address, sslHostname); return setupMasterEntry(client); } @@ -404,12 +411,12 @@ public class MasterSlaveEntry { } private RFuture addSlave(InetSocketAddress address, URI uri, final boolean freezed, final NodeType nodeType) { - RedisClient client = connectionManager.createClient(NodeType.SLAVE, address, uri); + RedisClient client = connectionManager.createClient(NodeType.SLAVE, address, uri, sslHostname); return addSlave(client, freezed, nodeType); } private RFuture addSlave(URI address, final boolean freezed, final NodeType nodeType) { - RedisClient client = connectionManager.createClient(NodeType.SLAVE, address); + RedisClient client = connectionManager.createClient(NodeType.SLAVE, address, sslHostname); return addSlave(client, freezed, nodeType); } diff --git a/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java b/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java index 9675a1683..bd0d6148f 100644 --- a/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java @@ -67,7 +67,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager { initTimer(this.config); for (URI addr : cfg.getNodeAddresses()) { - RFuture connectionFuture = connectToNode(cfg, addr, null); + RFuture connectionFuture = connectToNode(cfg, addr, null, addr.getHost()); connectionFuture.awaitUninterruptibly(); RedisConnection connection = connectionFuture.getNow(); if (connection == null) { @@ -119,7 +119,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager { return; } - RFuture connectionFuture = connectToNode(cfg, addr, null); + RFuture connectionFuture = connectToNode(cfg, addr, null, addr.getHost()); connectionFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { diff --git a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java index e9ca00f51..8a607da2e 100755 --- a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -90,7 +90,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { this.sentinelResolver = resolverGroup.getResolver(getGroup().next()); for (URI addr : cfg.getSentinelAddresses()) { - RedisClient client = createClient(NodeType.SENTINEL, addr, this.config.getConnectTimeout(), this.config.getRetryInterval() * this.config.getRetryAttempts()); + RedisClient client = createClient(NodeType.SENTINEL, addr, this.config.getConnectTimeout(), this.config.getRetryInterval() * this.config.getRetryAttempts(), null); try { RedisConnection connection = client.connect(); if (!connection.isActive()) { @@ -261,7 +261,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } RedisClient client = iterator.next(); - RFuture connectionFuture = connectToNode(null, null, client); + RFuture connectionFuture = connectToNode(null, null, client, null); connectionFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -433,7 +433,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { return RedissonPromise.newSucceededFuture(null); } - client = createClient(NodeType.SENTINEL, addr, c.getConnectTimeout(), c.getRetryInterval() * c.getRetryAttempts()); + client = createClient(NodeType.SENTINEL, addr, c.getConnectTimeout(), c.getRetryInterval() * c.getRetryAttempts(), null); RedisClient oldClient = sentinels.putIfAbsent(key, client); if (oldClient != null) { return RedissonPromise.newSucceededFuture(null); diff --git a/redisson/src/main/java/org/redisson/misc/URIBuilder.java b/redisson/src/main/java/org/redisson/misc/URIBuilder.java index f25511e7d..3a9476d2f 100644 --- a/redisson/src/main/java/org/redisson/misc/URIBuilder.java +++ b/redisson/src/main/java/org/redisson/misc/URIBuilder.java @@ -17,34 +17,44 @@ package org.redisson.misc; import java.net.InetSocketAddress; import java.net.URI; +import java.util.regex.Pattern; /** * * @author Rui Gu (https://github.com/jackygurui) */ public class URIBuilder { - + + private static final Pattern ipv4Pattern = Pattern.compile("(([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.){3}([01]?\\d\\d?|2[0-4]\\d|25[0-5])", Pattern.CASE_INSENSITIVE); + private static final Pattern ipv6Pattern = Pattern.compile("([0-9a-f]{1,4}:){7}([0-9a-f]){1,4}", Pattern.CASE_INSENSITIVE); + public static URI create(String uri) { URI u = URI.create(uri); - //Let's assuming most of the time it is OK. + // Let's assuming most of the time it is OK. if (u.getHost() != null) { return u; } - String s = uri.substring(0, uri.lastIndexOf(":")) - .replaceFirst("redis://", "") - .replaceFirst("rediss://", ""); - //Assuming this is an IPv6 format, other situations will be handled by - //Netty at a later stage. + String s = uri.substring(0, uri.lastIndexOf(":")).replaceFirst("redis://", "").replaceFirst("rediss://", ""); + // Assuming this is an IPv6 format, other situations will be handled by + // Netty at a later stage. return URI.create(uri.replace(s, "[" + s + "]")); } + public static boolean isValidIP(String host) { + if (ipv4Pattern.matcher(host).matches()) { + return true; + } + + return ipv6Pattern.matcher(host).matches(); + } + public static boolean compare(InetSocketAddress entryAddr, URI addr) { if (((entryAddr.getHostName() != null && entryAddr.getHostName().equals(addr.getHost())) || entryAddr.getAddress().getHostAddress().equals(addr.getHost())) - && entryAddr.getPort() == addr.getPort()) { + && entryAddr.getPort() == addr.getPort()) { return true; } return false; } - + } From ac5365a098f467b04eacce35c458dd3a5a382367 Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 13 Mar 2018 19:40:21 +0300 Subject: [PATCH 3/4] refactoring --- .../org/redisson/RedissonCountDownLatch.java | 4 +- .../java/org/redisson/RedissonFairLock.java | 4 +- .../main/java/org/redisson/RedissonLock.java | 4 +- .../org/redisson/RedissonPatternTopic.java | 23 +- .../RedissonPermitExpirableSemaphore.java | 4 +- .../java/org/redisson/RedissonSemaphore.java | 4 +- .../main/java/org/redisson/RedissonTopic.java | 25 +- .../connection/ConnectionManager.java | 24 +- .../connection/FutureConnectionListener.java | 79 --- .../MasterSlaveConnectionManager.java | 342 +------------ .../redisson/connection/MasterSlaveEntry.java | 88 +--- .../org/redisson/pubsub/PublishSubscribe.java | 12 +- .../pubsub/PublishSubscribeService.java | 463 ++++++++++++++++++ .../RedissonPatternTopicReactive.java | 11 +- .../WeightedRoundRobinBalancerTest.java | 3 +- 15 files changed, 536 insertions(+), 554 deletions(-) delete mode 100644 redisson/src/main/java/org/redisson/connection/FutureConnectionListener.java create mode 100644 redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java diff --git a/redisson/src/main/java/org/redisson/RedissonCountDownLatch.java b/redisson/src/main/java/org/redisson/RedissonCountDownLatch.java index d6eeb1e31..ddb4b759b 100644 --- a/redisson/src/main/java/org/redisson/RedissonCountDownLatch.java +++ b/redisson/src/main/java/org/redisson/RedissonCountDownLatch.java @@ -106,11 +106,11 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown } private RFuture subscribe() { - return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager()); + return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService()); } private void unsubscribe(RFuture future) { - PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager()); + PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService()); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonFairLock.java b/redisson/src/main/java/org/redisson/RedissonFairLock.java index fa4210b41..dd040f656 100644 --- a/redisson/src/main/java/org/redisson/RedissonFairLock.java +++ b/redisson/src/main/java/org/redisson/RedissonFairLock.java @@ -59,13 +59,13 @@ public class RedissonFairLock extends RedissonLock implements RLock { @Override protected RFuture subscribe(long threadId) { return PUBSUB.subscribe(getEntryName() + ":" + threadId, - getChannelName() + ":" + getLockName(threadId), commandExecutor.getConnectionManager()); + getChannelName() + ":" + getLockName(threadId), commandExecutor.getConnectionManager().getSubscribeService()); } @Override protected void unsubscribe(RFuture future, long threadId) { PUBSUB.unsubscribe(future.getNow(), getEntryName() + ":" + threadId, - getChannelName() + ":" + getLockName(threadId), commandExecutor.getConnectionManager()); + getChannelName() + ":" + getLockName(threadId), commandExecutor.getConnectionManager().getSubscribeService()); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonLock.java b/redisson/src/main/java/org/redisson/RedissonLock.java index 35ef6b747..023f9b267 100644 --- a/redisson/src/main/java/org/redisson/RedissonLock.java +++ b/redisson/src/main/java/org/redisson/RedissonLock.java @@ -348,11 +348,11 @@ public class RedissonLock extends RedissonExpirable implements RLock { } protected RFuture subscribe(long threadId) { - return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager()); + return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService()); } protected void unsubscribe(RFuture future, long threadId) { - PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager()); + PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService()); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonPatternTopic.java b/redisson/src/main/java/org/redisson/RedissonPatternTopic.java index c5632b2ca..f6f15cb4e 100644 --- a/redisson/src/main/java/org/redisson/RedissonPatternTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonPatternTopic.java @@ -29,6 +29,7 @@ import org.redisson.command.CommandExecutor; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.connection.PubSubConnectionEntry; import org.redisson.pubsub.AsyncSemaphore; +import org.redisson.pubsub.PublishSubscribeService; /** * Distributed topic implementation. Messages are delivered to all message listeners across Redis cluster. @@ -39,6 +40,7 @@ import org.redisson.pubsub.AsyncSemaphore; */ public class RedissonPatternTopic implements RPatternTopic { + final PublishSubscribeService subscribeService; final CommandExecutor commandExecutor; private final String name; private final Codec codec; @@ -51,6 +53,7 @@ public class RedissonPatternTopic implements RPatternTopic { this.commandExecutor = commandExecutor; this.name = name; this.codec = codec; + this.subscribeService = commandExecutor.getConnectionManager().getSubscribeService(); } @Override @@ -65,7 +68,7 @@ public class RedissonPatternTopic implements RPatternTopic { } private int addListener(RedisPubSubListener pubSubListener) { - RFuture future = commandExecutor.getConnectionManager().psubscribe(name, codec, pubSubListener); + RFuture future = subscribeService.psubscribe(name, codec, pubSubListener); commandExecutor.syncSubscription(future); return System.identityHashCode(pubSubListener); } @@ -80,10 +83,10 @@ public class RedissonPatternTopic implements RPatternTopic { @Override public void removeListener(int listenerId) { - AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); + AsyncSemaphore semaphore = subscribeService.getSemaphore(name); acquire(semaphore); - PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); + PubSubConnectionEntry entry = subscribeService.getPubSubEntry(name); if (entry == null) { semaphore.release(); return; @@ -91,7 +94,7 @@ public class RedissonPatternTopic implements RPatternTopic { entry.removeListener(name, listenerId); if (!entry.hasListeners(name)) { - commandExecutor.getConnectionManager().punsubscribe(name, semaphore); + subscribeService.punsubscribe(name, semaphore); } else { semaphore.release(); } @@ -99,10 +102,10 @@ public class RedissonPatternTopic implements RPatternTopic { @Override public void removeAllListeners() { - AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); + AsyncSemaphore semaphore = subscribeService.getSemaphore(name); acquire(semaphore); - PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); + PubSubConnectionEntry entry = subscribeService.getPubSubEntry(name); if (entry == null) { semaphore.release(); return; @@ -110,7 +113,7 @@ public class RedissonPatternTopic implements RPatternTopic { entry.removeAllListeners(name); if (!entry.hasListeners(name)) { - commandExecutor.getConnectionManager().punsubscribe(name, semaphore); + subscribeService.punsubscribe(name, semaphore); } else { semaphore.release(); } @@ -118,10 +121,10 @@ public class RedissonPatternTopic implements RPatternTopic { @Override public void removeListener(PatternMessageListener listener) { - AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); + AsyncSemaphore semaphore = subscribeService.getSemaphore(name); acquire(semaphore); - PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); + PubSubConnectionEntry entry = subscribeService.getPubSubEntry(name); if (entry == null) { semaphore.release(); return; @@ -129,7 +132,7 @@ public class RedissonPatternTopic implements RPatternTopic { entry.removeListener(name, listener); if (!entry.hasListeners(name)) { - commandExecutor.getConnectionManager().punsubscribe(name, semaphore); + subscribeService.punsubscribe(name, semaphore); } else { semaphore.release(); } diff --git a/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java b/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java index 131f1acee..8c4bd09cc 100644 --- a/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java +++ b/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java @@ -587,11 +587,11 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen } private RFuture subscribe() { - return semaphorePubSub.subscribe(getName(), getChannelName(), commandExecutor.getConnectionManager()); + return semaphorePubSub.subscribe(getName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService()); } private void unsubscribe(RFuture future) { - semaphorePubSub.unsubscribe(future.getNow(), getName(), getChannelName(), commandExecutor.getConnectionManager()); + semaphorePubSub.unsubscribe(future.getNow(), getName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService()); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonSemaphore.java b/redisson/src/main/java/org/redisson/RedissonSemaphore.java index 0026aa224..c78bb59f4 100644 --- a/redisson/src/main/java/org/redisson/RedissonSemaphore.java +++ b/redisson/src/main/java/org/redisson/RedissonSemaphore.java @@ -437,11 +437,11 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { } private RFuture subscribe() { - return semaphorePubSub.subscribe(getName(), getChannelName(), commandExecutor.getConnectionManager()); + return semaphorePubSub.subscribe(getName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService()); } private void unsubscribe(RFuture future) { - semaphorePubSub.unsubscribe(future.getNow(), getName(), getChannelName(), commandExecutor.getConnectionManager()); + semaphorePubSub.unsubscribe(future.getNow(), getName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService()); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonTopic.java b/redisson/src/main/java/org/redisson/RedissonTopic.java index 12245d58e..9d1b8083a 100644 --- a/redisson/src/main/java/org/redisson/RedissonTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonTopic.java @@ -34,6 +34,7 @@ import org.redisson.misc.RPromise; import org.redisson.misc.RedissonObjectFactory; import org.redisson.misc.RedissonPromise; import org.redisson.pubsub.AsyncSemaphore; +import org.redisson.pubsub.PublishSubscribeService; import io.netty.buffer.ByteBuf; import io.netty.util.concurrent.Future; @@ -48,6 +49,7 @@ import io.netty.util.concurrent.FutureListener; */ public class RedissonTopic implements RTopic { + final PublishSubscribeService subscribeService; final CommandAsyncExecutor commandExecutor; private final String name; private final Codec codec; @@ -60,6 +62,7 @@ public class RedissonTopic implements RTopic { this.commandExecutor = commandExecutor; this.name = name; this.codec = codec; + this.subscribeService = commandExecutor.getConnectionManager().getSubscribeService(); } public List getChannelNames() { @@ -103,13 +106,13 @@ public class RedissonTopic implements RTopic { } private int addListener(RedisPubSubListener pubSubListener) { - RFuture future = commandExecutor.getConnectionManager().subscribe(codec, name, pubSubListener); + RFuture future = subscribeService.subscribe(codec, name, pubSubListener); commandExecutor.syncSubscription(future); return System.identityHashCode(pubSubListener); } public RFuture addListenerAsync(final RedisPubSubListener pubSubListener) { - RFuture future = commandExecutor.getConnectionManager().subscribe(codec, name, pubSubListener); + RFuture future = subscribeService.subscribe(codec, name, pubSubListener); final RPromise result = new RedissonPromise(); future.addListener(new FutureListener() { @Override @@ -127,10 +130,10 @@ public class RedissonTopic implements RTopic { @Override public void removeAllListeners() { - AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); + AsyncSemaphore semaphore = subscribeService.getSemaphore(name); acquire(semaphore); - PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); + PubSubConnectionEntry entry = subscribeService.getPubSubEntry(name); if (entry == null) { semaphore.release(); return; @@ -138,7 +141,7 @@ public class RedissonTopic implements RTopic { entry.removeAllListeners(name); if (!entry.hasListeners(name)) { - commandExecutor.getConnectionManager().unsubscribe(name, semaphore); + subscribeService.unsubscribe(name, semaphore); } else { semaphore.release(); } @@ -154,10 +157,10 @@ public class RedissonTopic implements RTopic { @Override public void removeListener(MessageListener listener) { - AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); + AsyncSemaphore semaphore = subscribeService.getSemaphore(name); acquire(semaphore); - PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); + PubSubConnectionEntry entry = subscribeService.getPubSubEntry(name); if (entry == null) { semaphore.release(); return; @@ -165,7 +168,7 @@ public class RedissonTopic implements RTopic { entry.removeListener(name, listener); if (!entry.hasListeners(name)) { - commandExecutor.getConnectionManager().unsubscribe(name, semaphore); + subscribeService.unsubscribe(name, semaphore); } else { semaphore.release(); } @@ -174,10 +177,10 @@ public class RedissonTopic implements RTopic { @Override public void removeListener(int listenerId) { - AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); + AsyncSemaphore semaphore = subscribeService.getSemaphore(name); acquire(semaphore); - PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); + PubSubConnectionEntry entry = subscribeService.getPubSubEntry(name); if (entry == null) { semaphore.release(); return; @@ -185,7 +188,7 @@ public class RedissonTopic implements RTopic { entry.removeListener(name, listenerId); if (!entry.hasListeners(name)) { - commandExecutor.getConnectionManager().unsubscribe(name, semaphore); + subscribeService.unsubscribe(name, semaphore); } else { semaphore.release(); } diff --git a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java index b7ffa7c8e..fd867757d 100644 --- a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java @@ -26,15 +26,13 @@ import org.redisson.api.NodeType; import org.redisson.api.RFuture; import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; -import org.redisson.client.RedisPubSubListener; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; -import org.redisson.client.protocol.pubsub.PubSubType; import org.redisson.command.CommandSyncService; import org.redisson.config.Config; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.misc.InfinitySemaphoreLatch; -import org.redisson.pubsub.AsyncSemaphore; +import org.redisson.pubsub.PublishSubscribeService; import io.netty.channel.EventLoopGroup; import io.netty.util.Timeout; @@ -51,6 +49,8 @@ public interface ConnectionManager { CommandSyncService getCommandExecutor(); + PublishSubscribeService getSubscribeService(); + ExecutorService getExecutor(); URI getLastClusterNode(); @@ -59,17 +59,11 @@ public interface ConnectionManager { boolean isClusterMode(); - AsyncSemaphore getSemaphore(String channelName); - ConnectionEventsHub getConnectionEventsHub(); boolean isShutdown(); boolean isShuttingDown(); - - RFuture subscribe(Codec codec, String channelName, RedisPubSubListener... listeners); - - RFuture subscribe(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener... listeners); IdleConnectionWatcher getConnectionWatcher(); @@ -105,18 +99,6 @@ public interface ConnectionManager { MasterSlaveEntry getEntry(RedisClient redisClient); - PubSubConnectionEntry getPubSubEntry(String channelName); - - RFuture psubscribe(String pattern, Codec codec, RedisPubSubListener... listeners); - - RFuture psubscribe(String pattern, Codec codec, AsyncSemaphore semaphore, RedisPubSubListener... listeners); - - void unsubscribe(String channelName, AsyncSemaphore lock); - - RFuture unsubscribe(String channelName, PubSubType topicType); - - void punsubscribe(String channelName, AsyncSemaphore lock); - void shutdown(); void shutdown(long quietPeriod, long timeout, TimeUnit unit); diff --git a/redisson/src/main/java/org/redisson/connection/FutureConnectionListener.java b/redisson/src/main/java/org/redisson/connection/FutureConnectionListener.java deleted file mode 100644 index 4e58cecd3..000000000 --- a/redisson/src/main/java/org/redisson/connection/FutureConnectionListener.java +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Copyright 2018 Nikita Koksharov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.redisson.connection; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - -import org.redisson.api.RFuture; -import org.redisson.client.RedisConnection; -import org.redisson.client.protocol.RedisCommand; -import org.redisson.misc.RPromise; - -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; - -public class FutureConnectionListener implements FutureListener { - - private final AtomicInteger commandsCounter = new AtomicInteger(); - - private final RPromise connectionPromise; - private final T connection; - private final List commands = new ArrayList(4); - - public FutureConnectionListener(RPromise connectionFuture, T connection) { - super(); - this.connectionPromise = connectionFuture; - this.connection = connection; - } - - public void addCommand(final RedisCommand command, final Object ... params) { - commandsCounter.incrementAndGet(); - commands.add(new Runnable() { - @Override - public void run() { - RFuture future = connection.async(command, params); - future.addListener(FutureConnectionListener.this); - } - }); - } - - public void executeCommands() { - if (commands.isEmpty()) { - connectionPromise.trySuccess(connection); - return; - } - - for (Runnable command : commands) { - command.run(); - } - commands.clear(); - } - - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - connection.closeAsync(); - connectionPromise.tryFailure(future.cause()); - return; - } - if (commandsCounter.decrementAndGet() == 0) { - connectionPromise.trySuccess(connection); - } - } - -} diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index d73dfbc40..0d7a9d411 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -25,10 +25,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.UUID; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -37,19 +34,13 @@ import java.util.concurrent.atomic.AtomicReferenceArray; import org.redisson.Version; import org.redisson.api.NodeType; import org.redisson.api.RFuture; -import org.redisson.client.BaseRedisPubSubListener; import org.redisson.client.RedisClient; import org.redisson.client.RedisClientConfig; import org.redisson.client.RedisConnection; import org.redisson.client.RedisException; import org.redisson.client.RedisNodeNotFoundException; -import org.redisson.client.RedisPubSubConnection; -import org.redisson.client.RedisPubSubListener; -import org.redisson.client.RedisTimeoutException; -import org.redisson.client.SubscribeListener; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; -import org.redisson.client.protocol.pubsub.PubSubType; import org.redisson.cluster.ClusterSlotRange; import org.redisson.command.CommandSyncService; import org.redisson.config.BaseMasterSlaveServersConfig; @@ -61,6 +52,7 @@ import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; import org.redisson.misc.URIBuilder; import org.redisson.pubsub.AsyncSemaphore; +import org.redisson.pubsub.PublishSubscribeService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -136,10 +128,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager { protected final Class socketChannelClass; - protected final ConcurrentMap name2PubSubConnection = PlatformDependent.newConcurrentHashMap(); - - protected final Queue freePubSubConnections = new ConcurrentLinkedQueue(); - protected DNSMonitor dnsMonitor; protected MasterSlaveServersConfig config; @@ -161,14 +149,14 @@ public class MasterSlaveConnectionManager implements ConnectionManager { private final ExecutorService executor; - private final AsyncSemaphore freePubSubLock = new AsyncSemaphore(1); - private final CommandSyncService commandExecutor; private final Config cfg; protected final DnsAddressResolverGroup resolverGroup; + private PublishSubscribeService subscribeService; + private final Map nodeConnections = PlatformDependent.newConcurrentHashMap(); { @@ -179,8 +167,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager { public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config) { this(config); - initTimer(cfg); this.config = cfg; + + initTimer(cfg); initSingleEntry(); } @@ -348,6 +337,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } connectionWatcher = new IdleConnectionWatcher(this, config); + subscribeService = new PublishSubscribeService(this, config); } protected void initSingleEntry() { @@ -500,304 +490,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return singleSlotRange.getStartSlot(); } - @Override - public PubSubConnectionEntry getPubSubEntry(String channelName) { - return name2PubSubConnection.get(channelName); - } - - @Override - public RFuture psubscribe(String channelName, Codec codec, RedisPubSubListener... listeners) { - return subscribe(PubSubType.PSUBSCRIBE, codec, channelName, new RedissonPromise(), listeners); - } - - @Override - public RFuture psubscribe(String channelName, Codec codec, AsyncSemaphore semaphore, RedisPubSubListener... listeners) { - RPromise promise = new RedissonPromise(); - subscribe(codec, channelName, promise, PubSubType.PSUBSCRIBE, semaphore, listeners); - return promise; - } - - @Override - public RFuture subscribe(Codec codec, String channelName, RedisPubSubListener... listeners) { - return subscribe(PubSubType.SUBSCRIBE, codec, channelName, new RedissonPromise(), listeners); - } - - private RFuture subscribe(final PubSubType type, final Codec codec, final String channelName, - final RPromise promise, final RedisPubSubListener... listeners) { - final AsyncSemaphore lock = getSemaphore(channelName); - lock.acquire(new Runnable() { - @Override - public void run() { - if (promise.isDone()) { - lock.release(); - return; - } - - final RPromise result = new RedissonPromise(); - result.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - subscribe(type, codec, channelName, promise, listeners); - return; - } - - promise.trySuccess(result.getNow()); - } - }); - subscribe(codec, channelName, result, type, lock, listeners); - } - }); - return promise; - } - - public RFuture subscribe(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener... listeners) { - RPromise promise = new RedissonPromise(); - subscribe(codec, channelName, promise, PubSubType.SUBSCRIBE, semaphore, listeners); - return promise; - } - - public AsyncSemaphore getSemaphore(String channelName) { - return locks[Math.abs(channelName.hashCode() % locks.length)]; - } - - private void subscribe(final Codec codec, final String channelName, - final RPromise promise, final PubSubType type, final AsyncSemaphore lock, final RedisPubSubListener... listeners) { - final PubSubConnectionEntry connEntry = name2PubSubConnection.get(channelName); - if (connEntry != null) { - subscribe(channelName, promise, type, lock, connEntry, listeners); - return; - } - - freePubSubLock.acquire(new Runnable() { - - @Override - public void run() { - if (promise.isDone()) { - lock.release(); - freePubSubLock.release(); - return; - } - - final PubSubConnectionEntry freeEntry = freePubSubConnections.peek(); - if (freeEntry == null) { - connect(codec, channelName, promise, type, lock, listeners); - return; - } - - int remainFreeAmount = freeEntry.tryAcquire(); - if (remainFreeAmount == -1) { - throw new IllegalStateException(); - } - - final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry); - if (oldEntry != null) { - freeEntry.release(); - freePubSubLock.release(); - - subscribe(channelName, promise, type, lock, oldEntry, listeners); - return; - } - - if (remainFreeAmount == 0) { - freePubSubConnections.poll(); - } - freePubSubLock.release(); - - subscribe(channelName, promise, type, lock, freeEntry, listeners); - - if (PubSubType.PSUBSCRIBE == type) { - freeEntry.psubscribe(codec, channelName); - } else { - freeEntry.subscribe(codec, channelName); - } - } - - }); - } - - private void subscribe(final String channelName, final RPromise promise, - final PubSubType type, final AsyncSemaphore lock, final PubSubConnectionEntry connEntry, - final RedisPubSubListener... listeners) { - for (RedisPubSubListener listener : listeners) { - connEntry.addListener(channelName, listener); - } - SubscribeListener listener = connEntry.getSubscribeFuture(channelName, type); - final Future subscribeFuture = listener.getSuccessFuture(); - - subscribeFuture.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!promise.trySuccess(connEntry)) { - for (RedisPubSubListener listener : listeners) { - connEntry.removeListener(channelName, listener); - } - if (!connEntry.hasListeners(channelName)) { - unsubscribe(channelName, lock); - } else { - lock.release(); - } - } else { - lock.release(); - } - } - }); - - newTimeout(new TimerTask() { - @Override - public void run(Timeout timeout) throws Exception { - if (promise.tryFailure(new RedisTimeoutException())) { - subscribeFuture.cancel(false); - } - } - }, config.getRetryInterval(), TimeUnit.MILLISECONDS); - } - - private void connect(final Codec codec, final String channelName, - final RPromise promise, final PubSubType type, final AsyncSemaphore lock, final RedisPubSubListener... listeners) { - final int slot = calcSlot(channelName); - RFuture connFuture = nextPubSubConnection(slot); - connFuture.addListener(new FutureListener() { - - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - freePubSubLock.release(); - lock.release(); - promise.tryFailure(future.cause()); - return; - } - - RedisPubSubConnection conn = future.getNow(); - - final PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection()); - entry.tryAcquire(); - - final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); - if (oldEntry != null) { - releaseSubscribeConnection(slot, entry); - - freePubSubLock.release(); - - subscribe(channelName, promise, type, lock, oldEntry, listeners); - return; - } - - freePubSubConnections.add(entry); - freePubSubLock.release(); - - subscribe(channelName, promise, type, lock, entry, listeners); - - if (PubSubType.PSUBSCRIBE == type) { - entry.psubscribe(codec, channelName); - } else { - entry.subscribe(codec, channelName); - } - - } - }); - } - - public void unsubscribe(final String channelName, final AsyncSemaphore lock) { - final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName); - if (entry == null) { - lock.release(); - return; - } - - entry.unsubscribe(channelName, new BaseRedisPubSubListener() { - - @Override - public boolean onStatus(PubSubType type, String channel) { - if (type == PubSubType.UNSUBSCRIBE && channel.equals(channelName)) { - - if (entry.release() == 1) { - freePubSubConnections.add(entry); - } - - lock.release(); - return true; - } - return false; - } - - }); - } - - @Override - public RFuture unsubscribe(final String channelName, final PubSubType topicType) { - final RPromise result = new RedissonPromise(); - final AsyncSemaphore lock = getSemaphore(channelName); - lock.acquire(new Runnable() { - @Override - public void run() { - final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName); - if (entry == null) { - lock.release(); - result.trySuccess(null); - return; - } - - freePubSubLock.acquire(new Runnable() { - @Override - public void run() { - freePubSubConnections.remove(entry); - freePubSubLock.release(); - - final Codec entryCodec = entry.getConnection().getChannels().get(channelName); - RedisPubSubListener listener = new BaseRedisPubSubListener() { - - @Override - public boolean onStatus(PubSubType type, String channel) { - if (type == topicType && channel.equals(channelName)) { - lock.release(); - result.trySuccess(entryCodec); - return true; - } - return false; - } - - }; - - if (topicType == PubSubType.PUNSUBSCRIBE) { - entry.punsubscribe(channelName, listener); - } else { - entry.unsubscribe(channelName, listener); - } - } - }); - } - }); - - return result; - } - - @Override - public void punsubscribe(final String channelName, final AsyncSemaphore lock) { - final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName); - if (entry == null) { - lock.release(); - return; - } - - entry.punsubscribe(channelName, new BaseRedisPubSubListener() { - - @Override - public boolean onStatus(PubSubType type, String channel) { - if (type == PubSubType.PUNSUBSCRIBE && channel.equals(channelName)) { - - if (entry.release() == 1) { - freePubSubConnections.add(entry); - } - - lock.release(); - return true; - } - return false; - } - - }); - } @Override public MasterSlaveEntry getEntry(InetSocketAddress address) { @@ -904,24 +596,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return entry.connectionReadOp(command); } - RFuture nextPubSubConnection(int slot) { - MasterSlaveEntry entry = getEntry(slot); - if (entry == null) { - RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for slot: " + slot + " hasn't been discovered yet"); - return RedissonPromise.newFailedFuture(ex); - } - return entry.nextPubSubConnection(); - } - - protected void releaseSubscribeConnection(int slot, PubSubConnectionEntry pubSubEntry) { - MasterSlaveEntry entry = getEntry(slot); - if (entry == null) { - log.error("Node for slot: " + slot + " can't be found"); - } else { - entry.returnPubSubConnection(pubSubEntry); - } - } - @Override public void releaseWrite(NodeSource source, RedisConnection connection) { MasterSlaveEntry entry = getEntry(source); @@ -1036,6 +710,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager { group.shutdownGracefully().syncUninterruptibly(); } + public PublishSubscribeService getSubscribeService() { + return subscribeService; + } + public ExecutorService getExecutor() { return executor; } diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index c795b7a90..62d250165 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -48,6 +48,7 @@ import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; import org.redisson.misc.TransferListener; import org.redisson.misc.URIBuilder; +import org.redisson.pubsub.PublishSubscribeService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -171,7 +172,7 @@ public class MasterSlaveEntry { return false; } - return slaveDown(entry, freezeReason == FreezeReason.SYSTEM); + return slaveDown(entry); } public boolean slaveDown(InetSocketAddress address, FreezeReason freezeReason) { @@ -180,7 +181,7 @@ public class MasterSlaveEntry { return false; } - return slaveDown(entry, freezeReason == FreezeReason.SYSTEM); + return slaveDown(entry); } public boolean slaveDown(URI address, FreezeReason freezeReason) { @@ -189,10 +190,10 @@ public class MasterSlaveEntry { return false; } - return slaveDown(entry, freezeReason == FreezeReason.SYSTEM); + return slaveDown(entry); } - private boolean slaveDown(ClientConnectionsEntry entry, boolean temporaryDown) { + private boolean slaveDown(ClientConnectionsEntry entry) { // add master as slave if no more slaves available if (!config.checkSkipSlavesInit() && slaveBalancer.getAvailableClients() == 0) { if (slaveBalancer.unfreeze(masterEntry.getClient().getAddr(), FreezeReason.SYSTEM)) { @@ -205,7 +206,7 @@ public class MasterSlaveEntry { closeConnections(entry); for (RedisPubSubConnection connection : entry.getAllSubscribeConnections()) { - reattachPubSub(connection, temporaryDown); + connectionManager.getSubscribeService().reattachPubSub(connection); } entry.getAllSubscribeConnections().clear(); @@ -238,79 +239,6 @@ public class MasterSlaveEntry { } } - private void reattachPubSub(RedisPubSubConnection redisPubSubConnection, boolean temporaryDown) { - for (String channelName : redisPubSubConnection.getChannels().keySet()) { - PubSubConnectionEntry pubSubEntry = connectionManager.getPubSubEntry(channelName); - Collection> listeners = pubSubEntry.getListeners(channelName); - reattachPubSubListeners(channelName, listeners, PubSubType.UNSUBSCRIBE); - } - - for (String channelName : redisPubSubConnection.getPatternChannels().keySet()) { - PubSubConnectionEntry pubSubEntry = connectionManager.getPubSubEntry(channelName); - Collection> listeners = pubSubEntry.getListeners(channelName); - reattachPubSubListeners(channelName, listeners, PubSubType.PUNSUBSCRIBE); - } - } - - private void reattachPubSubListeners(final String channelName, final Collection> listeners, final PubSubType topicType) { - RFuture subscribeCodec = connectionManager.unsubscribe(channelName, topicType); - if (listeners.isEmpty()) { - return; - } - - subscribeCodec.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (future.get() == null) { - return; - } - - Codec subscribeCodec = future.get(); - if (topicType == PubSubType.PUNSUBSCRIBE) { - psubscribe(channelName, listeners, subscribeCodec); - } else { - subscribe(channelName, listeners, subscribeCodec); - } - } - - }); - } - - private void subscribe(final String channelName, final Collection> listeners, - final Codec subscribeCodec) { - RFuture subscribeFuture = connectionManager.subscribe(subscribeCodec, channelName, listeners.toArray(new RedisPubSubListener[listeners.size()])); - subscribeFuture.addListener(new FutureListener() { - - @Override - public void operationComplete(Future future) - throws Exception { - if (!future.isSuccess()) { - subscribe(channelName, listeners, subscribeCodec); - return; - } - - log.info("listeners of '{}' channel to '{}' have been resubscribed", channelName, future.getNow().getConnection().getRedisClient()); - } - }); - } - - private void psubscribe(final String channelName, final Collection> listeners, - final Codec subscribeCodec) { - RFuture subscribeFuture = connectionManager.psubscribe(channelName, subscribeCodec, listeners.toArray(new RedisPubSubListener[listeners.size()])); - subscribeFuture.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) - throws Exception { - if (!future.isSuccess()) { - psubscribe(channelName, listeners, subscribeCodec); - return; - } - - log.debug("resubscribed listeners for '{}' channel-pattern to '{}'", channelName, future.getNow().getConnection().getRedisClient()); - } - }); - } - private void reattachBlockingQueue(RedisConnection connection) { final CommandData commandData = connection.getCurrentCommand(); @@ -510,7 +438,7 @@ public class MasterSlaveEntry { pubSubConnectionPool.remove(oldMaster); oldMaster.freezeMaster(FreezeReason.MANAGER); - slaveDown(oldMaster, false); + slaveDown(oldMaster); slaveBalancer.changeType(oldMaster.getClient(), NodeType.SLAVE); slaveBalancer.changeType(newMasterClient, NodeType.MASTER); @@ -573,7 +501,7 @@ public class MasterSlaveEntry { return slaveBalancer.getConnection(command, addr); } - RFuture nextPubSubConnection() { + public RFuture nextPubSubConnection() { if (config.getSubscriptionMode() == SubscriptionMode.MASTER) { return pubSubConnectionPool.get(); } diff --git a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribe.java b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribe.java index 2ff1ef5b0..bb5b2255c 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribe.java +++ b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribe.java @@ -40,8 +40,8 @@ abstract class PublishSubscribe> { private final ConcurrentMap entries = PlatformDependent.newConcurrentHashMap(); - public void unsubscribe(final E entry, final String entryName, final String channelName, final ConnectionManager connectionManager) { - final AsyncSemaphore semaphore = connectionManager.getSemaphore(channelName); + public void unsubscribe(final E entry, final String entryName, final String channelName, final PublishSubscribeService subscribeService) { + final AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName); semaphore.acquire(new Runnable() { @Override public void run() { @@ -51,7 +51,7 @@ abstract class PublishSubscribe> { if (!removed) { throw new IllegalStateException(); } - connectionManager.unsubscribe(channelName, semaphore); + subscribeService.unsubscribe(channelName, semaphore); } else { semaphore.release(); } @@ -64,9 +64,9 @@ abstract class PublishSubscribe> { return entries.get(entryName); } - public RFuture subscribe(final String entryName, final String channelName, final ConnectionManager connectionManager) { + public RFuture subscribe(final String entryName, final String channelName, final PublishSubscribeService subscribeService) { final AtomicReference listenerHolder = new AtomicReference(); - final AsyncSemaphore semaphore = connectionManager.getSemaphore(channelName); + final AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName); final RPromise newPromise = new RedissonPromise() { @Override public boolean cancel(boolean mayInterruptIfRunning) { @@ -98,7 +98,7 @@ abstract class PublishSubscribe> { } RedisPubSubListener listener = createListener(channelName, value); - connectionManager.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener); + subscribeService.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener); } }; semaphore.acquire(listener); diff --git a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java new file mode 100644 index 000000000..985247e99 --- /dev/null +++ b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java @@ -0,0 +1,463 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.pubsub; + +import java.util.Collection; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + +import org.redisson.api.RFuture; +import org.redisson.client.BaseRedisPubSubListener; +import org.redisson.client.RedisNodeNotFoundException; +import org.redisson.client.RedisPubSubConnection; +import org.redisson.client.RedisPubSubListener; +import org.redisson.client.RedisTimeoutException; +import org.redisson.client.SubscribeListener; +import org.redisson.client.codec.Codec; +import org.redisson.client.protocol.pubsub.PubSubType; +import org.redisson.config.MasterSlaveServersConfig; +import org.redisson.connection.ConnectionManager; +import org.redisson.connection.MasterSlaveEntry; +import org.redisson.connection.PubSubConnectionEntry; +import org.redisson.misc.RPromise; +import org.redisson.misc.RedissonPromise; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.util.Timeout; +import io.netty.util.TimerTask; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.internal.PlatformDependent; + +/** + * + * @author Nikita Koksharov + * + */ +public class PublishSubscribeService { + + private static final Logger log = LoggerFactory.getLogger(PublishSubscribeService.class); + + private final ConnectionManager connectionManager; + + private final MasterSlaveServersConfig config; + + private final AsyncSemaphore[] locks = new AsyncSemaphore[50]; + + private final AsyncSemaphore freePubSubLock = new AsyncSemaphore(1); + + protected final ConcurrentMap name2PubSubConnection = PlatformDependent.newConcurrentHashMap(); + + protected final Queue freePubSubConnections = new ConcurrentLinkedQueue(); + + public PublishSubscribeService(ConnectionManager connectionManager, MasterSlaveServersConfig config) { + super(); + this.connectionManager = connectionManager; + this.config = config; + for (int i = 0; i < locks.length; i++) { + locks[i] = new AsyncSemaphore(1); + } + } + + public PubSubConnectionEntry getPubSubEntry(String channelName) { + return name2PubSubConnection.get(channelName); + } + + public RFuture psubscribe(String channelName, Codec codec, RedisPubSubListener... listeners) { + return subscribe(PubSubType.PSUBSCRIBE, codec, channelName, new RedissonPromise(), listeners); + } + + public RFuture psubscribe(String channelName, Codec codec, AsyncSemaphore semaphore, RedisPubSubListener... listeners) { + RPromise promise = new RedissonPromise(); + subscribe(codec, channelName, promise, PubSubType.PSUBSCRIBE, semaphore, listeners); + return promise; + } + + public RFuture subscribe(Codec codec, String channelName, RedisPubSubListener... listeners) { + return subscribe(PubSubType.SUBSCRIBE, codec, channelName, new RedissonPromise(), listeners); + } + + private RFuture subscribe(final PubSubType type, final Codec codec, final String channelName, + final RPromise promise, final RedisPubSubListener... listeners) { + final AsyncSemaphore lock = getSemaphore(channelName); + lock.acquire(new Runnable() { + @Override + public void run() { + if (promise.isDone()) { + lock.release(); + return; + } + + final RPromise result = new RedissonPromise(); + result.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + subscribe(type, codec, channelName, promise, listeners); + return; + } + + promise.trySuccess(result.getNow()); + } + }); + subscribe(codec, channelName, result, type, lock, listeners); + } + }); + return promise; + } + + public RFuture subscribe(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener... listeners) { + RPromise promise = new RedissonPromise(); + subscribe(codec, channelName, promise, PubSubType.SUBSCRIBE, semaphore, listeners); + return promise; + } + + public AsyncSemaphore getSemaphore(String channelName) { + return locks[Math.abs(channelName.hashCode() % locks.length)]; + } + + private void subscribe(final Codec codec, final String channelName, + final RPromise promise, final PubSubType type, final AsyncSemaphore lock, final RedisPubSubListener... listeners) { + final PubSubConnectionEntry connEntry = name2PubSubConnection.get(channelName); + if (connEntry != null) { + subscribe(channelName, promise, type, lock, connEntry, listeners); + return; + } + + freePubSubLock.acquire(new Runnable() { + + @Override + public void run() { + if (promise.isDone()) { + lock.release(); + freePubSubLock.release(); + return; + } + + final PubSubConnectionEntry freeEntry = freePubSubConnections.peek(); + if (freeEntry == null) { + connect(codec, channelName, promise, type, lock, listeners); + return; + } + + int remainFreeAmount = freeEntry.tryAcquire(); + if (remainFreeAmount == -1) { + throw new IllegalStateException(); + } + + final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry); + if (oldEntry != null) { + freeEntry.release(); + freePubSubLock.release(); + + subscribe(channelName, promise, type, lock, oldEntry, listeners); + return; + } + + if (remainFreeAmount == 0) { + freePubSubConnections.poll(); + } + freePubSubLock.release(); + + subscribe(channelName, promise, type, lock, freeEntry, listeners); + + if (PubSubType.PSUBSCRIBE == type) { + freeEntry.psubscribe(codec, channelName); + } else { + freeEntry.subscribe(codec, channelName); + } + } + + }); + } + + private void subscribe(final String channelName, final RPromise promise, + final PubSubType type, final AsyncSemaphore lock, final PubSubConnectionEntry connEntry, + final RedisPubSubListener... listeners) { + for (RedisPubSubListener listener : listeners) { + connEntry.addListener(channelName, listener); + } + SubscribeListener listener = connEntry.getSubscribeFuture(channelName, type); + final Future subscribeFuture = listener.getSuccessFuture(); + + subscribeFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!promise.trySuccess(connEntry)) { + for (RedisPubSubListener listener : listeners) { + connEntry.removeListener(channelName, listener); + } + if (!connEntry.hasListeners(channelName)) { + unsubscribe(channelName, lock); + } else { + lock.release(); + } + } else { + lock.release(); + } + } + }); + + connectionManager.newTimeout(new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + if (promise.tryFailure(new RedisTimeoutException())) { + subscribeFuture.cancel(false); + } + } + }, config.getRetryInterval(), TimeUnit.MILLISECONDS); + } + + private void releaseSubscribeConnection(int slot, PubSubConnectionEntry pubSubEntry) { + MasterSlaveEntry entry = connectionManager.getEntry(slot); + if (entry == null) { + log.error("Node for slot: " + slot + " can't be found"); + } else { + entry.returnPubSubConnection(pubSubEntry); + } + } + + private RFuture nextPubSubConnection(int slot) { + MasterSlaveEntry entry = connectionManager.getEntry(slot); + if (entry == null) { + RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for slot: " + slot + " hasn't been discovered yet"); + return RedissonPromise.newFailedFuture(ex); + } + return entry.nextPubSubConnection(); + } + + private void connect(final Codec codec, final String channelName, + final RPromise promise, final PubSubType type, final AsyncSemaphore lock, final RedisPubSubListener... listeners) { + final int slot = connectionManager.calcSlot(channelName); + RFuture connFuture = nextPubSubConnection(slot); + connFuture.addListener(new FutureListener() { + + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + freePubSubLock.release(); + lock.release(); + promise.tryFailure(future.cause()); + return; + } + + RedisPubSubConnection conn = future.getNow(); + + final PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection()); + entry.tryAcquire(); + + final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); + if (oldEntry != null) { + releaseSubscribeConnection(slot, entry); + + freePubSubLock.release(); + + subscribe(channelName, promise, type, lock, oldEntry, listeners); + return; + } + + freePubSubConnections.add(entry); + freePubSubLock.release(); + + subscribe(channelName, promise, type, lock, entry, listeners); + + if (PubSubType.PSUBSCRIBE == type) { + entry.psubscribe(codec, channelName); + } else { + entry.subscribe(codec, channelName); + } + + } + }); + } + + public void unsubscribe(final String channelName, final AsyncSemaphore lock) { + final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName); + if (entry == null) { + lock.release(); + return; + } + + entry.unsubscribe(channelName, new BaseRedisPubSubListener() { + + @Override + public boolean onStatus(PubSubType type, String channel) { + if (type == PubSubType.UNSUBSCRIBE && channel.equals(channelName)) { + + if (entry.release() == 1) { + freePubSubConnections.add(entry); + } + + lock.release(); + return true; + } + return false; + } + + }); + } + + public RFuture unsubscribe(final String channelName, final PubSubType topicType) { + final RPromise result = new RedissonPromise(); + final AsyncSemaphore lock = getSemaphore(channelName); + lock.acquire(new Runnable() { + @Override + public void run() { + final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName); + if (entry == null) { + lock.release(); + result.trySuccess(null); + return; + } + + freePubSubLock.acquire(new Runnable() { + @Override + public void run() { + freePubSubConnections.remove(entry); + freePubSubLock.release(); + + final Codec entryCodec = entry.getConnection().getChannels().get(channelName); + RedisPubSubListener listener = new BaseRedisPubSubListener() { + + @Override + public boolean onStatus(PubSubType type, String channel) { + if (type == topicType && channel.equals(channelName)) { + lock.release(); + result.trySuccess(entryCodec); + return true; + } + return false; + } + + }; + + if (topicType == PubSubType.PUNSUBSCRIBE) { + entry.punsubscribe(channelName, listener); + } else { + entry.unsubscribe(channelName, listener); + } + } + }); + } + }); + + return result; + } + + public void punsubscribe(final String channelName, final AsyncSemaphore lock) { + final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName); + if (entry == null) { + lock.release(); + return; + } + + entry.punsubscribe(channelName, new BaseRedisPubSubListener() { + + @Override + public boolean onStatus(PubSubType type, String channel) { + if (type == PubSubType.PUNSUBSCRIBE && channel.equals(channelName)) { + + if (entry.release() == 1) { + freePubSubConnections.add(entry); + } + + lock.release(); + return true; + } + return false; + } + + }); + } + + public void reattachPubSub(RedisPubSubConnection redisPubSubConnection) { + for (String channelName : redisPubSubConnection.getChannels().keySet()) { + PubSubConnectionEntry pubSubEntry = getPubSubEntry(channelName); + Collection> listeners = pubSubEntry.getListeners(channelName); + reattachPubSubListeners(channelName, listeners, PubSubType.UNSUBSCRIBE); + } + + for (String channelName : redisPubSubConnection.getPatternChannels().keySet()) { + PubSubConnectionEntry pubSubEntry = getPubSubEntry(channelName); + Collection> listeners = pubSubEntry.getListeners(channelName); + reattachPubSubListeners(channelName, listeners, PubSubType.PUNSUBSCRIBE); + } + } + + private void reattachPubSubListeners(final String channelName, final Collection> listeners, final PubSubType topicType) { + RFuture subscribeCodec = unsubscribe(channelName, topicType); + if (listeners.isEmpty()) { + return; + } + + subscribeCodec.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (future.get() == null) { + return; + } + + Codec subscribeCodec = future.get(); + if (topicType == PubSubType.PUNSUBSCRIBE) { + psubscribe(channelName, listeners, subscribeCodec); + } else { + subscribe(channelName, listeners, subscribeCodec); + } + } + + }); + } + + private void subscribe(final String channelName, final Collection> listeners, + final Codec subscribeCodec) { + RFuture subscribeFuture = subscribe(subscribeCodec, channelName, listeners.toArray(new RedisPubSubListener[listeners.size()])); + subscribeFuture.addListener(new FutureListener() { + + @Override + public void operationComplete(Future future) + throws Exception { + if (!future.isSuccess()) { + subscribe(channelName, listeners, subscribeCodec); + return; + } + + log.info("listeners of '{}' channel to '{}' have been resubscribed", channelName, future.getNow().getConnection().getRedisClient()); + } + }); + } + + private void psubscribe(final String channelName, final Collection> listeners, + final Codec subscribeCodec) { + RFuture subscribeFuture = psubscribe(channelName, subscribeCodec, listeners.toArray(new RedisPubSubListener[listeners.size()])); + subscribeFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) + throws Exception { + if (!future.isSuccess()) { + psubscribe(channelName, listeners, subscribeCodec); + return; + } + + log.debug("resubscribed listeners for '{}' channel-pattern to '{}'", channelName, future.getNow().getConnection().getRedisClient()); + } + }); + } + + +} diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonPatternTopicReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonPatternTopicReactive.java index 2ceb2fa4e..4c589958a 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonPatternTopicReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonPatternTopicReactive.java @@ -34,6 +34,7 @@ import org.redisson.connection.PubSubConnectionEntry; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; import org.redisson.pubsub.AsyncSemaphore; +import org.redisson.pubsub.PublishSubscribeService; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; @@ -48,6 +49,7 @@ import reactor.fn.Supplier; */ public class RedissonPatternTopicReactive implements RPatternTopicReactive { + final PublishSubscribeService subscribeService; final CommandReactiveExecutor commandExecutor; private final String name; private final Codec codec; @@ -60,6 +62,7 @@ public class RedissonPatternTopicReactive implements RPatternTopicReactive this.commandExecutor = commandExecutor; this.name = name; this.codec = codec; + this.subscribeService = commandExecutor.getConnectionManager().getSubscribeService(); } @Override @@ -88,7 +91,7 @@ public class RedissonPatternTopicReactive implements RPatternTopicReactive } private void addListener(final RedisPubSubListener pubSubListener, final RPromise promise) { - RFuture future = commandExecutor.getConnectionManager().psubscribe(name, codec, pubSubListener); + RFuture future = subscribeService.psubscribe(name, codec, pubSubListener); future.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -112,10 +115,10 @@ public class RedissonPatternTopicReactive implements RPatternTopicReactive @Override public void removeListener(int listenerId) { - AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); + AsyncSemaphore semaphore = subscribeService.getSemaphore(name); acquire(semaphore); - PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); + PubSubConnectionEntry entry = subscribeService.getPubSubEntry(name); if (entry == null) { semaphore.release(); return; @@ -123,7 +126,7 @@ public class RedissonPatternTopicReactive implements RPatternTopicReactive entry.removeListener(name, listenerId); if (!entry.hasListeners(name)) { - commandExecutor.getConnectionManager().punsubscribe(name, semaphore); + subscribeService.punsubscribe(name, semaphore); } else { semaphore.release(); } diff --git a/redisson/src/test/java/org/redisson/connection/balancer/WeightedRoundRobinBalancerTest.java b/redisson/src/test/java/org/redisson/connection/balancer/WeightedRoundRobinBalancerTest.java index 940d30b47..3a1bf0519 100644 --- a/redisson/src/test/java/org/redisson/connection/balancer/WeightedRoundRobinBalancerTest.java +++ b/redisson/src/test/java/org/redisson/connection/balancer/WeightedRoundRobinBalancerTest.java @@ -11,12 +11,13 @@ import org.redisson.RedisRunner; import org.redisson.RedisRunner.RedisProcess; import org.redisson.Redisson; import org.redisson.api.RedissonClient; +import org.redisson.client.WriteRedisConnectionException; import org.redisson.config.Config; import org.redisson.config.ReadMode; public class WeightedRoundRobinBalancerTest { - @Test + @Test(expected = WriteRedisConnectionException.class) public void testUseMasterForReadsIfNoConnectionsToSlaves() throws IOException, InterruptedException { RedisProcess master = null; RedisProcess slave = null; From cdc1d164c367b82afc17ddce4651b6c1e966e7e2 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 14 Mar 2018 14:43:25 +0300 Subject: [PATCH 4/4] Fixed - Old/stale nodes not removed from NodesGroup #1330 --- .../main/java/org/redisson/RedisNodes.java | 51 +++++++++++++++---- .../main/java/org/redisson/api/NodeType.java | 5 ++ .../java/org/redisson/client/RedisClient.java | 5 ++ .../connection/ConnectionManager.java | 2 - .../MasterSlaveConnectionManager.java | 13 ----- .../redisson/connection/MasterSlaveEntry.java | 15 ++++-- .../balancer/LoadBalancerManager.java | 12 ++--- 7 files changed, 68 insertions(+), 35 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedisNodes.java b/redisson/src/main/java/org/redisson/RedisNodes.java index d7108a683..d49c97a68 100644 --- a/redisson/src/main/java/org/redisson/RedisNodes.java +++ b/redisson/src/main/java/org/redisson/RedisNodes.java @@ -30,9 +30,12 @@ import org.redisson.api.NodesGroup; import org.redisson.api.RFuture; import org.redisson.client.RedisConnection; import org.redisson.client.protocol.RedisCommands; +import org.redisson.connection.ClientConnectionsEntry; import org.redisson.connection.ConnectionListener; import org.redisson.connection.ConnectionManager; +import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.RedisClientEntry; +import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.misc.URIBuilder; import io.netty.util.concurrent.Future; @@ -54,11 +57,17 @@ public class RedisNodes implements NodesGroup { @Override public N getNode(String address) { - Collection clients = (Collection) connectionManager.getClients(); + Collection entries = connectionManager.getEntrySet(); URI addr = URIBuilder.create(address); - for (N node : clients) { - if (URIBuilder.compare(node.getAddr(), addr)) { - return node; + for (MasterSlaveEntry masterSlaveEntry : entries) { + if (URIBuilder.compare(masterSlaveEntry.getClient().getAddr(), addr)) { + return (N) new RedisClientEntry(masterSlaveEntry.getClient(), connectionManager.getCommandExecutor(), NodeType.MASTER); + } + for (ClientConnectionsEntry entry : masterSlaveEntry.getSlaveEntries()) { + if (URIBuilder.compare(entry.getClient().getAddr(), addr) || + entry.getFreezeReason() == null || entry.getFreezeReason() == FreezeReason.RECONNECT) { + return (N) new RedisClientEntry(entry.getClient(), connectionManager.getCommandExecutor(), entry.getNodeType()); + } } } return null; @@ -66,11 +75,20 @@ public class RedisNodes implements NodesGroup { @Override public Collection getNodes(NodeType type) { - Collection clients = (Collection) connectionManager.getClients(); + Collection entries = connectionManager.getEntrySet(); List result = new ArrayList(); - for (N node : clients) { - if (node.getType().equals(type)) { - result.add(node); + for (MasterSlaveEntry masterSlaveEntry : entries) { + if (type == NodeType.MASTER) { + RedisClientEntry entry = new RedisClientEntry(masterSlaveEntry.getClient(), connectionManager.getCommandExecutor(), NodeType.MASTER); + result.add((N) entry); + } + if (type == NodeType.SLAVE) { + for (ClientConnectionsEntry slaveEntry : masterSlaveEntry.getSlaveEntries()) { + if (slaveEntry.getFreezeReason() == null || slaveEntry.getFreezeReason() == FreezeReason.RECONNECT) { + RedisClientEntry entry = new RedisClientEntry(slaveEntry.getClient(), connectionManager.getCommandExecutor(), slaveEntry.getNodeType()); + result.add((N) entry); + } + } } } return result; @@ -79,12 +97,25 @@ public class RedisNodes implements NodesGroup { @Override public Collection getNodes() { - return (Collection) connectionManager.getClients(); + Collection entries = connectionManager.getEntrySet(); + List result = new ArrayList(); + for (MasterSlaveEntry masterSlaveEntry : entries) { + RedisClientEntry masterEntry = new RedisClientEntry(masterSlaveEntry.getClient(), connectionManager.getCommandExecutor(), NodeType.MASTER); + result.add((N) masterEntry); + + for (ClientConnectionsEntry slaveEntry : masterSlaveEntry.getSlaveEntries()) { + if (slaveEntry.getFreezeReason() == null || slaveEntry.getFreezeReason() == FreezeReason.RECONNECT) { + RedisClientEntry entry = new RedisClientEntry(slaveEntry.getClient(), connectionManager.getCommandExecutor(), slaveEntry.getNodeType()); + result.add((N) entry); + } + } + } + return result; } @Override public boolean pingAll() { - List clients = new ArrayList(connectionManager.getClients()); + List clients = new ArrayList((Collection)getNodes()); final Map> result = new ConcurrentHashMap>(clients.size()); final CountDownLatch latch = new CountDownLatch(clients.size()); for (RedisClientEntry entry : clients) { diff --git a/redisson/src/main/java/org/redisson/api/NodeType.java b/redisson/src/main/java/org/redisson/api/NodeType.java index 9d0cc60f2..0a7f9e5db 100644 --- a/redisson/src/main/java/org/redisson/api/NodeType.java +++ b/redisson/src/main/java/org/redisson/api/NodeType.java @@ -15,6 +15,11 @@ */ package org.redisson.api; +/** + * + * @author Nikita Koksharov + * + */ public enum NodeType { MASTER, SLAVE, SENTINEL diff --git a/redisson/src/main/java/org/redisson/client/RedisClient.java b/redisson/src/main/java/org/redisson/client/RedisClient.java index f69790536..aeaae72ce 100644 --- a/redisson/src/main/java/org/redisson/client/RedisClient.java +++ b/redisson/src/main/java/org/redisson/client/RedisClient.java @@ -315,6 +315,11 @@ public class RedisClient { return; } + if (!hasOwnTimer && !hasOwnExecutor && !hasOwnResolver && !hasOwnGroup) { + result.trySuccess(null); + return; + } + Thread t = new Thread() { @Override public void run() { diff --git a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java index fd867757d..4de8b004d 100644 --- a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java @@ -67,8 +67,6 @@ public interface ConnectionManager { IdleConnectionWatcher getConnectionWatcher(); - Collection getClients(); - void shutdownAsync(RedisClient client); int calcSlot(String key); diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 0d7a9d411..3801a29f9 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -21,7 +21,6 @@ import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -139,8 +138,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager { private final InfinitySemaphoreLatch shutdownLatch = new InfinitySemaphoreLatch(); - private final Map clientEntries = PlatformDependent.newConcurrentHashMap(); - private IdleConnectionWatcher connectionWatcher; private final ConnectionEventsHub connectionEventsHub = new ConnectionEventsHub(); @@ -422,22 +419,17 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public RedisClient createClient(NodeType type, URI address, String sslHostname) { RedisClient client = createClient(type, address, config.getConnectTimeout(), config.getRetryInterval() * config.getRetryAttempts(), sslHostname); - clientEntries.put(client, new RedisClientEntry(client, commandExecutor, type)); return client; } @Override public RedisClient createClient(NodeType type, InetSocketAddress address, URI uri, String sslHostname) { RedisClient client = createClient(type, address, uri, config.getConnectTimeout(), config.getRetryInterval() * config.getRetryAttempts(), sslHostname); - clientEntries.put(client, new RedisClientEntry(client, commandExecutor, type)); return client; } @Override public void shutdownAsync(RedisClient client) { - if (clientEntries.remove(client) == null) { - log.error("Can't find client {}", client); - } client.shutdownAsync(); } @@ -664,11 +656,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return group.isTerminated(); } - @Override - public Collection getClients() { - return Collections.unmodifiableCollection(clientEntries.values()); - } - @Override public EventLoopGroup getGroup() { return group; diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 62d250165..ba06d3397 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -17,6 +17,7 @@ package org.redisson.connection; import java.net.InetSocketAddress; import java.net.URI; +import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.LinkedList; @@ -29,12 +30,9 @@ import org.redisson.api.RFuture; import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisPubSubConnection; -import org.redisson.client.RedisPubSubListener; -import org.redisson.client.codec.Codec; import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; -import org.redisson.client.protocol.pubsub.PubSubType; import org.redisson.cluster.ClusterConnectionManager; import org.redisson.cluster.ClusterSlotRange; import org.redisson.config.MasterSlaveServersConfig; @@ -48,7 +46,6 @@ import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; import org.redisson.misc.TransferListener; import org.redisson.misc.URIBuilder; -import org.redisson.pubsub.PublishSubscribeService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -348,6 +345,16 @@ public class MasterSlaveEntry { return addSlave(client, freezed, nodeType); } + public Collection getSlaveEntries() { + List result = new ArrayList(); + for (ClientConnectionsEntry slaveEntry : slaveBalancer.getEntries()) { + if (slaveEntry.getNodeType() == NodeType.SLAVE) { + result.add(slaveEntry); + } + } + return result; + } + public RedisClient getClient() { return masterEntry.getClient(); } diff --git a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java index c36197e94..35d1c5d9f 100644 --- a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java +++ b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java @@ -17,6 +17,8 @@ package org.redisson.connection.balancer; import java.net.InetSocketAddress; import java.net.URI; +import java.util.Collection; +import java.util.Collections; import java.util.Map; import org.redisson.api.NodeType; @@ -98,6 +100,10 @@ public class LoadBalancerManager { return result; } + public Collection getEntries() { + return Collections.unmodifiableCollection(client2Entry.values()); + } + public int getAvailableClients() { int count = 0; for (ClientConnectionsEntry connectionEntry : client2Entry.values()) { @@ -154,12 +160,6 @@ public class LoadBalancerManager { return freeze(connectionEntry, freezeReason); } - - public ClientConnectionsEntry freeze(RedisClient redisClient, FreezeReason freezeReason) { - ClientConnectionsEntry connectionEntry = getEntry(redisClient); - return freeze(connectionEntry, freezeReason); - } - public ClientConnectionsEntry freeze(ClientConnectionsEntry connectionEntry, FreezeReason freezeReason) { if (connectionEntry == null) { return null;