Merge branch 'master' into 3.0.0

pull/1821/head
Nikita 7 years ago
commit 3909f68548

@ -140,7 +140,8 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
}
public RMap<String, Object> getMap(String sessionId) {
return redisson.getMap(keyPrefix + "redisson_tomcat_session:" + sessionId);
String separator = keyPrefix == null || keyPrefix.isEmpty() ? "" : ":";
return redisson.getMap(keyPrefix + separator + "redisson_tomcat_session:" + sessionId);
}
@Override

@ -120,7 +120,8 @@ public class RedissonSessionManager extends ManagerBase {
}
public RMap<String, Object> getMap(String sessionId) {
return redisson.getMap(keyPrefix + "redisson_tomcat_session:" + sessionId);
String separator = keyPrefix == null || keyPrefix.isEmpty() ? "" : ":";
return redisson.getMap(keyPrefix + separator + "redisson_tomcat_session:" + sessionId);
}
@Override

@ -119,7 +119,8 @@ public class RedissonSessionManager extends ManagerBase {
}
public RMap<String, Object> getMap(String sessionId) {
return redisson.getMap(keyPrefix + "redisson_tomcat_session:" + sessionId);
String separator = keyPrefix == null || keyPrefix.isEmpty() ? "" : ":";
return redisson.getMap(keyPrefix + separator + "redisson_tomcat_session:" + sessionId);
}
@Override

@ -119,7 +119,8 @@ public class RedissonSessionManager extends ManagerBase {
}
public RMap<String, Object> getMap(String sessionId) {
return redisson.getMap(keyPrefix + "redisson_tomcat_session:" + sessionId);
String separator = keyPrefix == null || keyPrefix.isEmpty() ? "" : ":";
return redisson.getMap(keyPrefix + separator + "redisson_tomcat_session:" + sessionId);
}
@Override

@ -30,9 +30,12 @@ import org.redisson.api.NodesGroup;
import org.redisson.api.RFuture;
import org.redisson.client.RedisConnection;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.ConnectionListener;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.RedisClientEntry;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.URIBuilder;
import io.netty.util.concurrent.Future;
@ -54,11 +57,17 @@ public class RedisNodes<N extends Node> implements NodesGroup<N> {
@Override
public N getNode(String address) {
Collection<N> clients = (Collection<N>) connectionManager.getClients();
Collection<MasterSlaveEntry> entries = connectionManager.getEntrySet();
URI addr = URIBuilder.create(address);
for (N node : clients) {
if (URIBuilder.compare(node.getAddr(), addr)) {
return node;
for (MasterSlaveEntry masterSlaveEntry : entries) {
if (URIBuilder.compare(masterSlaveEntry.getClient().getAddr(), addr)) {
return (N) new RedisClientEntry(masterSlaveEntry.getClient(), connectionManager.getCommandExecutor(), NodeType.MASTER);
}
for (ClientConnectionsEntry entry : masterSlaveEntry.getSlaveEntries()) {
if (URIBuilder.compare(entry.getClient().getAddr(), addr) ||
entry.getFreezeReason() == null || entry.getFreezeReason() == FreezeReason.RECONNECT) {
return (N) new RedisClientEntry(entry.getClient(), connectionManager.getCommandExecutor(), entry.getNodeType());
}
}
}
return null;
@ -66,11 +75,20 @@ public class RedisNodes<N extends Node> implements NodesGroup<N> {
@Override
public Collection<N> getNodes(NodeType type) {
Collection<N> clients = (Collection<N>) connectionManager.getClients();
Collection<MasterSlaveEntry> entries = connectionManager.getEntrySet();
List<N> result = new ArrayList<N>();
for (N node : clients) {
if (node.getType().equals(type)) {
result.add(node);
for (MasterSlaveEntry masterSlaveEntry : entries) {
if (type == NodeType.MASTER) {
RedisClientEntry entry = new RedisClientEntry(masterSlaveEntry.getClient(), connectionManager.getCommandExecutor(), NodeType.MASTER);
result.add((N) entry);
}
if (type == NodeType.SLAVE) {
for (ClientConnectionsEntry slaveEntry : masterSlaveEntry.getSlaveEntries()) {
if (slaveEntry.getFreezeReason() == null || slaveEntry.getFreezeReason() == FreezeReason.RECONNECT) {
RedisClientEntry entry = new RedisClientEntry(slaveEntry.getClient(), connectionManager.getCommandExecutor(), slaveEntry.getNodeType());
result.add((N) entry);
}
}
}
}
return result;
@ -79,12 +97,25 @@ public class RedisNodes<N extends Node> implements NodesGroup<N> {
@Override
public Collection<N> getNodes() {
return (Collection<N>) connectionManager.getClients();
Collection<MasterSlaveEntry> entries = connectionManager.getEntrySet();
List<N> result = new ArrayList<N>();
for (MasterSlaveEntry masterSlaveEntry : entries) {
RedisClientEntry masterEntry = new RedisClientEntry(masterSlaveEntry.getClient(), connectionManager.getCommandExecutor(), NodeType.MASTER);
result.add((N) masterEntry);
for (ClientConnectionsEntry slaveEntry : masterSlaveEntry.getSlaveEntries()) {
if (slaveEntry.getFreezeReason() == null || slaveEntry.getFreezeReason() == FreezeReason.RECONNECT) {
RedisClientEntry entry = new RedisClientEntry(slaveEntry.getClient(), connectionManager.getCommandExecutor(), slaveEntry.getNodeType());
result.add((N) entry);
}
}
}
return result;
}
@Override
public boolean pingAll() {
List<RedisClientEntry> clients = new ArrayList<RedisClientEntry>(connectionManager.getClients());
List<RedisClientEntry> clients = new ArrayList<RedisClientEntry>((Collection<RedisClientEntry>)getNodes());
final Map<RedisConnection, RFuture<String>> result = new ConcurrentHashMap<RedisConnection, RFuture<String>>(clients.size());
final CountDownLatch latch = new CountDownLatch(clients.size());
for (RedisClientEntry entry : clients) {

@ -106,11 +106,11 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
}
private RFuture<RedissonCountDownLatchEntry> subscribe() {
return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager());
return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}
private void unsubscribe(RFuture<RedissonCountDownLatchEntry> future) {
PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager());
PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}
@Override

@ -59,13 +59,13 @@ public class RedissonFairLock extends RedissonLock implements RLock {
@Override
protected RFuture<RedissonLockEntry> subscribe(long threadId) {
return PUBSUB.subscribe(getEntryName() + ":" + threadId,
getChannelName() + ":" + getLockName(threadId), commandExecutor.getConnectionManager());
getChannelName() + ":" + getLockName(threadId), commandExecutor.getConnectionManager().getSubscribeService());
}
@Override
protected void unsubscribe(RFuture<RedissonLockEntry> future, long threadId) {
PUBSUB.unsubscribe(future.getNow(), getEntryName() + ":" + threadId,
getChannelName() + ":" + getLockName(threadId), commandExecutor.getConnectionManager());
getChannelName() + ":" + getLockName(threadId), commandExecutor.getConnectionManager().getSubscribeService());
}
@Override

@ -348,11 +348,11 @@ public class RedissonLock extends RedissonExpirable implements RLock {
}
protected RFuture<RedissonLockEntry> subscribe(long threadId) {
return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager());
return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}
protected void unsubscribe(RFuture<RedissonLockEntry> future, long threadId) {
PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager());
PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}
@Override

@ -29,6 +29,7 @@ import org.redisson.command.CommandExecutor;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.pubsub.AsyncSemaphore;
import org.redisson.pubsub.PublishSubscribeService;
/**
* Distributed topic implementation. Messages are delivered to all message listeners across Redis cluster.
@ -39,6 +40,7 @@ import org.redisson.pubsub.AsyncSemaphore;
*/
public class RedissonPatternTopic<M> implements RPatternTopic<M> {
final PublishSubscribeService subscribeService;
final CommandExecutor commandExecutor;
private final String name;
private final Codec codec;
@ -51,6 +53,7 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
this.commandExecutor = commandExecutor;
this.name = name;
this.codec = codec;
this.subscribeService = commandExecutor.getConnectionManager().getSubscribeService();
}
@Override
@ -65,7 +68,7 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
}
private int addListener(RedisPubSubListener<?> pubSubListener) {
RFuture<PubSubConnectionEntry> future = commandExecutor.getConnectionManager().psubscribe(name, codec, pubSubListener);
RFuture<PubSubConnectionEntry> future = subscribeService.psubscribe(name, codec, pubSubListener);
commandExecutor.syncSubscription(future);
return System.identityHashCode(pubSubListener);
}
@ -80,10 +83,10 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
@Override
public void removeListener(int listenerId) {
AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
AsyncSemaphore semaphore = subscribeService.getSemaphore(name);
acquire(semaphore);
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
PubSubConnectionEntry entry = subscribeService.getPubSubEntry(name);
if (entry == null) {
semaphore.release();
return;
@ -91,7 +94,7 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
entry.removeListener(name, listenerId);
if (!entry.hasListeners(name)) {
commandExecutor.getConnectionManager().punsubscribe(name, semaphore);
subscribeService.punsubscribe(name, semaphore);
} else {
semaphore.release();
}
@ -99,10 +102,10 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
@Override
public void removeAllListeners() {
AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
AsyncSemaphore semaphore = subscribeService.getSemaphore(name);
acquire(semaphore);
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
PubSubConnectionEntry entry = subscribeService.getPubSubEntry(name);
if (entry == null) {
semaphore.release();
return;
@ -110,7 +113,7 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
entry.removeAllListeners(name);
if (!entry.hasListeners(name)) {
commandExecutor.getConnectionManager().punsubscribe(name, semaphore);
subscribeService.punsubscribe(name, semaphore);
} else {
semaphore.release();
}
@ -118,10 +121,10 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
@Override
public void removeListener(PatternMessageListener<M> listener) {
AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
AsyncSemaphore semaphore = subscribeService.getSemaphore(name);
acquire(semaphore);
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
PubSubConnectionEntry entry = subscribeService.getPubSubEntry(name);
if (entry == null) {
semaphore.release();
return;
@ -129,7 +132,7 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
entry.removeListener(name, listener);
if (!entry.hasListeners(name)) {
commandExecutor.getConnectionManager().punsubscribe(name, semaphore);
subscribeService.punsubscribe(name, semaphore);
} else {
semaphore.release();
}

@ -587,11 +587,11 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
}
private RFuture<RedissonLockEntry> subscribe() {
return semaphorePubSub.subscribe(getName(), getChannelName(), commandExecutor.getConnectionManager());
return semaphorePubSub.subscribe(getName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}
private void unsubscribe(RFuture<RedissonLockEntry> future) {
semaphorePubSub.unsubscribe(future.getNow(), getName(), getChannelName(), commandExecutor.getConnectionManager());
semaphorePubSub.unsubscribe(future.getNow(), getName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}
@Override

@ -437,11 +437,11 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
}
private RFuture<RedissonLockEntry> subscribe() {
return semaphorePubSub.subscribe(getName(), getChannelName(), commandExecutor.getConnectionManager());
return semaphorePubSub.subscribe(getName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}
private void unsubscribe(RFuture<RedissonLockEntry> future) {
semaphorePubSub.unsubscribe(future.getNow(), getName(), getChannelName(), commandExecutor.getConnectionManager());
semaphorePubSub.unsubscribe(future.getNow(), getName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}
@Override

@ -34,6 +34,7 @@ import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonObjectFactory;
import org.redisson.misc.RedissonPromise;
import org.redisson.pubsub.AsyncSemaphore;
import org.redisson.pubsub.PublishSubscribeService;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.Future;
@ -48,6 +49,7 @@ import io.netty.util.concurrent.FutureListener;
*/
public class RedissonTopic<M> implements RTopic<M> {
final PublishSubscribeService subscribeService;
final CommandAsyncExecutor commandExecutor;
private final String name;
private final Codec codec;
@ -60,6 +62,7 @@ public class RedissonTopic<M> implements RTopic<M> {
this.commandExecutor = commandExecutor;
this.name = name;
this.codec = codec;
this.subscribeService = commandExecutor.getConnectionManager().getSubscribeService();
}
public List<String> getChannelNames() {
@ -103,13 +106,13 @@ public class RedissonTopic<M> implements RTopic<M> {
}
private int addListener(RedisPubSubListener<?> pubSubListener) {
RFuture<PubSubConnectionEntry> future = commandExecutor.getConnectionManager().subscribe(codec, name, pubSubListener);
RFuture<PubSubConnectionEntry> future = subscribeService.subscribe(codec, name, pubSubListener);
commandExecutor.syncSubscription(future);
return System.identityHashCode(pubSubListener);
}
public RFuture<Integer> addListenerAsync(final RedisPubSubListener<?> pubSubListener) {
RFuture<PubSubConnectionEntry> future = commandExecutor.getConnectionManager().subscribe(codec, name, pubSubListener);
RFuture<PubSubConnectionEntry> future = subscribeService.subscribe(codec, name, pubSubListener);
final RPromise<Integer> result = new RedissonPromise<Integer>();
future.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
@ -127,10 +130,10 @@ public class RedissonTopic<M> implements RTopic<M> {
@Override
public void removeAllListeners() {
AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
AsyncSemaphore semaphore = subscribeService.getSemaphore(name);
acquire(semaphore);
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
PubSubConnectionEntry entry = subscribeService.getPubSubEntry(name);
if (entry == null) {
semaphore.release();
return;
@ -138,7 +141,7 @@ public class RedissonTopic<M> implements RTopic<M> {
entry.removeAllListeners(name);
if (!entry.hasListeners(name)) {
commandExecutor.getConnectionManager().unsubscribe(name, semaphore);
subscribeService.unsubscribe(name, semaphore);
} else {
semaphore.release();
}
@ -154,10 +157,10 @@ public class RedissonTopic<M> implements RTopic<M> {
@Override
public void removeListener(MessageListener<?> listener) {
AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
AsyncSemaphore semaphore = subscribeService.getSemaphore(name);
acquire(semaphore);
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
PubSubConnectionEntry entry = subscribeService.getPubSubEntry(name);
if (entry == null) {
semaphore.release();
return;
@ -165,7 +168,7 @@ public class RedissonTopic<M> implements RTopic<M> {
entry.removeListener(name, listener);
if (!entry.hasListeners(name)) {
commandExecutor.getConnectionManager().unsubscribe(name, semaphore);
subscribeService.unsubscribe(name, semaphore);
} else {
semaphore.release();
}
@ -174,10 +177,10 @@ public class RedissonTopic<M> implements RTopic<M> {
@Override
public void removeListener(int listenerId) {
AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
AsyncSemaphore semaphore = subscribeService.getSemaphore(name);
acquire(semaphore);
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
PubSubConnectionEntry entry = subscribeService.getPubSubEntry(name);
if (entry == null) {
semaphore.release();
return;
@ -185,7 +188,7 @@ public class RedissonTopic<M> implements RTopic<M> {
entry.removeListener(name, listenerId);
if (!entry.hasListeners(name)) {
commandExecutor.getConnectionManager().unsubscribe(name, semaphore);
subscribeService.unsubscribe(name, semaphore);
} else {
semaphore.release();
}

@ -15,6 +15,11 @@
*/
package org.redisson.api;
/**
*
* @author Nikita Koksharov
*
*/
public enum NodeType {
MASTER, SLAVE, SENTINEL

@ -315,6 +315,11 @@ public class RedisClient {
return;
}
if (!hasOwnTimer && !hasOwnExecutor && !hasOwnResolver && !hasOwnGroup) {
result.trySuccess(null);
return;
}
Thread t = new Thread() {
@Override
public void run() {

@ -55,6 +55,7 @@ public class RedisClientConfig {
private boolean keepAlive;
private boolean tcpNoDelay;
private String sslHostname;
private boolean sslEnableEndpointIdentification = true;
private SslProvider sslProvider = SslProvider.JDK;
private URI sslTruststore;
@ -89,8 +90,17 @@ public class RedisClientConfig {
this.sslKeystore = config.sslKeystore;
this.sslKeystorePassword = config.sslKeystorePassword;
this.resolverGroup = config.resolverGroup;
this.sslHostname = config.sslHostname;
}
public String getSslHostname() {
return sslHostname;
}
public RedisClientConfig setSslHostname(String sslHostname) {
this.sslHostname = sslHostname;
return this;
}
public RedisClientConfig setAddress(String host, int port) {
this.address = URIBuilder.create("redis://" + host + ":" + port);
return this;

@ -33,6 +33,7 @@ import org.redisson.client.RedisClient;
import org.redisson.client.RedisClientConfig;
import org.redisson.client.RedisConnection;
import org.redisson.config.SslProvider;
import org.redisson.misc.URIBuilder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
@ -162,7 +163,12 @@ public class RedisChannelInitializer extends ChannelInitializer<Channel> {
}
SslContext sslContext = sslContextBuilder.build();
SSLEngine sslEngine = sslContext.newEngine(ch.alloc(), config.getAddress().getHost(), config.getAddress().getPort());
String hostname = config.getSslHostname();
if (hostname == null || URIBuilder.isValidIP(hostname)) {
hostname = config.getAddress().getHost();
}
SSLEngine sslEngine = sslContext.newEngine(ch.alloc(), hostname, config.getAddress().getPort());
sslEngine.setSSLParameters(sslParams);
SslHandler sslHandler = new SslHandler(sslEngine);

@ -82,6 +82,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private RedisStrictCommand<List<ClusterNodeInfo>> clusterNodesCommand;
private String configEndpointHostName;
private boolean isConfigEndpoint;
private AddressResolver<InetSocketAddress> resolver;
@ -95,7 +97,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
Throwable lastException = null;
List<String> failedMasters = new ArrayList<String>();
for (URI addr : cfg.getNodeAddresses()) {
RFuture<RedisConnection> connectionFuture = connectToNode(cfg, addr, null);
RFuture<RedisConnection> connectionFuture = connectToNode(cfg, addr, null, addr.getHost());
try {
RedisConnection connection = connectionFuture.syncUninterruptibly().getNow();
@ -104,6 +106,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
Future<List<InetSocketAddress>> addrsFuture = resolver.resolveAll(InetSocketAddress.createUnresolved(addr.getHost(), addr.getPort()));
List<InetSocketAddress> allAddrs = addrsFuture.syncUninterruptibly().getNow();
if (allAddrs.size() > 1) {
configEndpointHostName = addr.getHost();
isConfigEndpoint = true;
} else {
resolver.close();
@ -178,8 +181,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
@Override
protected RedisClientConfig createRedisConfig(NodeType type, URI address, int timeout, int commandTimeout) {
RedisClientConfig result = super.createRedisConfig(type, address, timeout, commandTimeout);
protected RedisClientConfig createRedisConfig(NodeType type, URI address, int timeout, int commandTimeout, String sslHostname) {
RedisClientConfig result = super.createRedisConfig(type, address, timeout, commandTimeout, sslHostname);
result.setReadOnly(type == NodeType.SLAVE && config.getReadMode() != ReadMode.MASTER);
return result;
}
@ -198,7 +201,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
final RPromise<Collection<RFuture<Void>>> result = new RedissonPromise<Collection<RFuture<Void>>>();
RFuture<RedisConnection> connectionFuture = connectToNode(cfg, partition.getMasterAddress(), null);
RFuture<RedisConnection> connectionFuture = connectToNode(cfg, partition.getMasterAddress(), null, configEndpointHostName);
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> future) throws Exception {
@ -351,7 +354,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
return;
}
final URI uri = iterator.next();
RFuture<RedisConnection> connectionFuture = connectToNode(cfg, uri, null);
RFuture<RedisConnection> connectionFuture = connectToNode(cfg, uri, null, configEndpointHostName);
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> future) throws Exception {
@ -651,6 +654,10 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
}
}
public String getConfigEndpointHostName() {
return configEndpointHostName;
}
@Override
public int calcSlot(String key) {

@ -26,15 +26,13 @@ import org.redisson.api.NodeType;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.command.CommandSyncService;
import org.redisson.config.Config;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.misc.InfinitySemaphoreLatch;
import org.redisson.pubsub.AsyncSemaphore;
import org.redisson.pubsub.PublishSubscribeService;
import io.netty.channel.EventLoopGroup;
import io.netty.util.Timeout;
@ -51,6 +49,8 @@ public interface ConnectionManager {
CommandSyncService getCommandExecutor();
PublishSubscribeService getSubscribeService();
ExecutorService getExecutor();
URI getLastClusterNode();
@ -59,22 +59,14 @@ public interface ConnectionManager {
boolean isClusterMode();
AsyncSemaphore getSemaphore(String channelName);
ConnectionEventsHub getConnectionEventsHub();
boolean isShutdown();
boolean isShuttingDown();
RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, RedisPubSubListener<?>... listeners);
RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners);
IdleConnectionWatcher getConnectionWatcher();
Collection<RedisClientEntry> getClients();
void shutdownAsync(RedisClient client);
int calcSlot(String key);
@ -97,26 +89,14 @@ public interface ConnectionManager {
RFuture<RedisConnection> connectionWriteOp(NodeSource source, RedisCommand<?> command);
RedisClient createClient(NodeType type, URI address, int timeout, int commandTimeout);
RedisClient createClient(NodeType type, URI address, int timeout, int commandTimeout, String sslHostname);
RedisClient createClient(NodeType type, InetSocketAddress address, URI uri);
RedisClient createClient(NodeType type, InetSocketAddress address, URI uri, String sslHostname);
RedisClient createClient(NodeType type, URI address);
RedisClient createClient(NodeType type, URI address, String sslHostname);
MasterSlaveEntry getEntry(RedisClient redisClient);
PubSubConnectionEntry getPubSubEntry(String channelName);
RFuture<PubSubConnectionEntry> psubscribe(String pattern, Codec codec, RedisPubSubListener<?>... listeners);
RFuture<PubSubConnectionEntry> psubscribe(String pattern, Codec codec, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners);
void unsubscribe(String channelName, AsyncSemaphore lock);
RFuture<Codec> unsubscribe(String channelName, PubSubType topicType);
void punsubscribe(String channelName, AsyncSemaphore lock);
void shutdown();
void shutdown(long quietPeriod, long timeout, TimeUnit unit);

@ -1,79 +0,0 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.connection;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.api.RFuture;
import org.redisson.client.RedisConnection;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.misc.RPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
public class FutureConnectionListener<T extends RedisConnection> implements FutureListener<Object> {
private final AtomicInteger commandsCounter = new AtomicInteger();
private final RPromise<T> connectionPromise;
private final T connection;
private final List<Runnable> commands = new ArrayList<Runnable>(4);
public FutureConnectionListener(RPromise<T> connectionFuture, T connection) {
super();
this.connectionPromise = connectionFuture;
this.connection = connection;
}
public void addCommand(final RedisCommand<?> command, final Object ... params) {
commandsCounter.incrementAndGet();
commands.add(new Runnable() {
@Override
public void run() {
RFuture<Object> future = connection.async(command, params);
future.addListener(FutureConnectionListener.this);
}
});
}
public void executeCommands() {
if (commands.isEmpty()) {
connectionPromise.trySuccess(connection);
return;
}
for (Runnable command : commands) {
command.run();
}
commands.clear();
}
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (!future.isSuccess()) {
connection.closeAsync();
connectionPromise.tryFailure(future.cause());
return;
}
if (commandsCounter.decrementAndGet() == 0) {
connectionPromise.trySuccess(connection);
}
}
}

@ -21,14 +21,10 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ -37,19 +33,13 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
import org.redisson.Version;
import org.redisson.api.NodeType;
import org.redisson.api.RFuture;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisClientConfig;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
import org.redisson.client.RedisNodeNotFoundException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.SubscribeListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.command.CommandSyncService;
import org.redisson.config.BaseMasterSlaveServersConfig;
@ -61,6 +51,7 @@ import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.URIBuilder;
import org.redisson.pubsub.AsyncSemaphore;
import org.redisson.pubsub.PublishSubscribeService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -136,10 +127,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected final Class<? extends SocketChannel> socketChannelClass;
protected final ConcurrentMap<String, PubSubConnectionEntry> name2PubSubConnection = PlatformDependent.newConcurrentHashMap();
protected final Queue<PubSubConnectionEntry> freePubSubConnections = new ConcurrentLinkedQueue<PubSubConnectionEntry>();
protected DNSMonitor dnsMonitor;
protected MasterSlaveServersConfig config;
@ -151,8 +138,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
private final InfinitySemaphoreLatch shutdownLatch = new InfinitySemaphoreLatch();
private final Map<RedisClient, RedisClientEntry> clientEntries = PlatformDependent.newConcurrentHashMap();
private IdleConnectionWatcher connectionWatcher;
private final ConnectionEventsHub connectionEventsHub = new ConnectionEventsHub();
@ -161,14 +146,14 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
private final ExecutorService executor;
private final AsyncSemaphore freePubSubLock = new AsyncSemaphore(1);
private final CommandSyncService commandExecutor;
private final Config cfg;
protected final DnsAddressResolverGroup resolverGroup;
private PublishSubscribeService subscribeService;
private final Map<Object, RedisConnection> nodeConnections = PlatformDependent.newConcurrentHashMap();
{
@ -179,8 +164,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config) {
this(config);
initTimer(cfg);
this.config = cfg;
initTimer(cfg);
initSingleEntry();
}
@ -250,7 +236,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
}
protected RFuture<RedisConnection> connectToNode(BaseMasterSlaveServersConfig<?> cfg, final URI addr, RedisClient client) {
protected RFuture<RedisConnection> connectToNode(BaseMasterSlaveServersConfig<?> cfg, final URI addr, RedisClient client, String sslHostname) {
final Object key;
if (client != null) {
key = client;
@ -263,7 +249,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
if (addr != null) {
client = createClient(NodeType.MASTER, addr, cfg.getConnectTimeout(), cfg.getRetryInterval() * cfg.getRetryAttempts());
client = createClient(NodeType.MASTER, addr, cfg.getConnectTimeout(), cfg.getRetryInterval() * cfg.getRetryAttempts(), sslHostname);
}
final RPromise<RedisConnection> result = new RedissonPromise<RedisConnection>();
RFuture<RedisConnection> future = client.connectAsync();
@ -348,6 +334,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
connectionWatcher = new IdleConnectionWatcher(this, config);
subscribeService = new PublishSubscribeService(this, config);
}
protected void initSingleEntry() {
@ -430,41 +417,36 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
@Override
public RedisClient createClient(NodeType type, URI address) {
RedisClient client = createClient(type, address, config.getConnectTimeout(), config.getRetryInterval() * config.getRetryAttempts());
clientEntries.put(client, new RedisClientEntry(client, commandExecutor, type));
public RedisClient createClient(NodeType type, URI address, String sslHostname) {
RedisClient client = createClient(type, address, config.getConnectTimeout(), config.getRetryInterval() * config.getRetryAttempts(), sslHostname);
return client;
}
@Override
public RedisClient createClient(NodeType type, InetSocketAddress address, URI uri) {
RedisClient client = createClient(type, address, uri, config.getConnectTimeout(), config.getRetryInterval() * config.getRetryAttempts());
clientEntries.put(client, new RedisClientEntry(client, commandExecutor, type));
public RedisClient createClient(NodeType type, InetSocketAddress address, URI uri, String sslHostname) {
RedisClient client = createClient(type, address, uri, config.getConnectTimeout(), config.getRetryInterval() * config.getRetryAttempts(), sslHostname);
return client;
}
@Override
public void shutdownAsync(RedisClient client) {
if (clientEntries.remove(client) == null) {
log.error("Can't find client {}", client);
}
client.shutdownAsync();
}
@Override
public RedisClient createClient(NodeType type, URI address, int timeout, int commandTimeout) {
RedisClientConfig redisConfig = createRedisConfig(type, address, timeout, commandTimeout);
public RedisClient createClient(NodeType type, URI address, int timeout, int commandTimeout, String sslHostname) {
RedisClientConfig redisConfig = createRedisConfig(type, address, timeout, commandTimeout, sslHostname);
return RedisClient.create(redisConfig);
}
private RedisClient createClient(NodeType type, InetSocketAddress address, URI uri, int timeout, int commandTimeout) {
RedisClientConfig redisConfig = createRedisConfig(type, null, timeout, commandTimeout);
private RedisClient createClient(NodeType type, InetSocketAddress address, URI uri, int timeout, int commandTimeout, String sslHostname) {
RedisClientConfig redisConfig = createRedisConfig(type, null, timeout, commandTimeout, sslHostname);
redisConfig.setAddress(address, uri);
return RedisClient.create(redisConfig);
}
protected RedisClientConfig createRedisConfig(NodeType type, URI address, int timeout, int commandTimeout) {
protected RedisClientConfig createRedisConfig(NodeType type, URI address, int timeout, int commandTimeout, String sslHostname) {
RedisClientConfig redisConfig = new RedisClientConfig();
redisConfig.setAddress(address)
.setTimer(timer)
@ -474,6 +456,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
.setSocketChannelClass(socketChannelClass)
.setConnectTimeout(timeout)
.setCommandTimeout(commandTimeout)
.setSslHostname(sslHostname)
.setSslEnableEndpointIdentification(config.isSslEnableEndpointIdentification())
.setSslProvider(config.getSslProvider())
.setSslTruststore(config.getSslTruststore())
@ -499,304 +482,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return singleSlotRange.getStartSlot();
}
@Override
public PubSubConnectionEntry getPubSubEntry(String channelName) {
return name2PubSubConnection.get(channelName);
}
@Override
public RFuture<PubSubConnectionEntry> psubscribe(String channelName, Codec codec, RedisPubSubListener<?>... listeners) {
return subscribe(PubSubType.PSUBSCRIBE, codec, channelName, new RedissonPromise<PubSubConnectionEntry>(), listeners);
}
@Override
public RFuture<PubSubConnectionEntry> psubscribe(String channelName, Codec codec, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) {
RPromise<PubSubConnectionEntry> promise = new RedissonPromise<PubSubConnectionEntry>();
subscribe(codec, channelName, promise, PubSubType.PSUBSCRIBE, semaphore, listeners);
return promise;
}
@Override
public RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, RedisPubSubListener<?>... listeners) {
return subscribe(PubSubType.SUBSCRIBE, codec, channelName, new RedissonPromise<PubSubConnectionEntry>(), listeners);
}
private RFuture<PubSubConnectionEntry> subscribe(final PubSubType type, final Codec codec, final String channelName,
final RPromise<PubSubConnectionEntry> promise, final RedisPubSubListener<?>... listeners) {
final AsyncSemaphore lock = getSemaphore(channelName);
lock.acquire(new Runnable() {
@Override
public void run() {
if (promise.isDone()) {
lock.release();
return;
}
final RPromise<PubSubConnectionEntry> result = new RedissonPromise<PubSubConnectionEntry>();
result.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
public void operationComplete(Future<PubSubConnectionEntry> future) throws Exception {
if (!future.isSuccess()) {
subscribe(type, codec, channelName, promise, listeners);
return;
}
promise.trySuccess(result.getNow());
}
});
subscribe(codec, channelName, result, type, lock, listeners);
}
});
return promise;
}
public RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) {
RPromise<PubSubConnectionEntry> promise = new RedissonPromise<PubSubConnectionEntry>();
subscribe(codec, channelName, promise, PubSubType.SUBSCRIBE, semaphore, listeners);
return promise;
}
public AsyncSemaphore getSemaphore(String channelName) {
return locks[Math.abs(channelName.hashCode() % locks.length)];
}
private void subscribe(final Codec codec, final String channelName,
final RPromise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock, final RedisPubSubListener<?>... listeners) {
final PubSubConnectionEntry connEntry = name2PubSubConnection.get(channelName);
if (connEntry != null) {
subscribe(channelName, promise, type, lock, connEntry, listeners);
return;
}
freePubSubLock.acquire(new Runnable() {
@Override
public void run() {
if (promise.isDone()) {
lock.release();
freePubSubLock.release();
return;
}
final PubSubConnectionEntry freeEntry = freePubSubConnections.peek();
if (freeEntry == null) {
connect(codec, channelName, promise, type, lock, listeners);
return;
}
int remainFreeAmount = freeEntry.tryAcquire();
if (remainFreeAmount == -1) {
throw new IllegalStateException();
}
final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry);
if (oldEntry != null) {
freeEntry.release();
freePubSubLock.release();
subscribe(channelName, promise, type, lock, oldEntry, listeners);
return;
}
if (remainFreeAmount == 0) {
freePubSubConnections.poll();
}
freePubSubLock.release();
subscribe(channelName, promise, type, lock, freeEntry, listeners);
if (PubSubType.PSUBSCRIBE == type) {
freeEntry.psubscribe(codec, channelName);
} else {
freeEntry.subscribe(codec, channelName);
}
}
});
}
private void subscribe(final String channelName, final RPromise<PubSubConnectionEntry> promise,
final PubSubType type, final AsyncSemaphore lock, final PubSubConnectionEntry connEntry,
final RedisPubSubListener<?>... listeners) {
for (RedisPubSubListener<?> listener : listeners) {
connEntry.addListener(channelName, listener);
}
SubscribeListener listener = connEntry.getSubscribeFuture(channelName, type);
final Future<Void> subscribeFuture = listener.getSuccessFuture();
subscribeFuture.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!promise.trySuccess(connEntry)) {
for (RedisPubSubListener<?> listener : listeners) {
connEntry.removeListener(channelName, listener);
}
if (!connEntry.hasListeners(channelName)) {
unsubscribe(channelName, lock);
} else {
lock.release();
}
} else {
lock.release();
}
}
});
newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (promise.tryFailure(new RedisTimeoutException())) {
subscribeFuture.cancel(false);
}
}
}, config.getRetryInterval(), TimeUnit.MILLISECONDS);
}
private void connect(final Codec codec, final String channelName,
final RPromise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock, final RedisPubSubListener<?>... listeners) {
final int slot = calcSlot(channelName);
RFuture<RedisPubSubConnection> connFuture = nextPubSubConnection(slot);
connFuture.addListener(new FutureListener<RedisPubSubConnection>() {
@Override
public void operationComplete(Future<RedisPubSubConnection> future) throws Exception {
if (!future.isSuccess()) {
freePubSubLock.release();
lock.release();
promise.tryFailure(future.cause());
return;
}
RedisPubSubConnection conn = future.getNow();
final PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
entry.tryAcquire();
final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
releaseSubscribeConnection(slot, entry);
freePubSubLock.release();
subscribe(channelName, promise, type, lock, oldEntry, listeners);
return;
}
freePubSubConnections.add(entry);
freePubSubLock.release();
subscribe(channelName, promise, type, lock, entry, listeners);
if (PubSubType.PSUBSCRIBE == type) {
entry.psubscribe(codec, channelName);
} else {
entry.subscribe(codec, channelName);
}
}
});
}
public void unsubscribe(final String channelName, final AsyncSemaphore lock) {
final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null) {
lock.release();
return;
}
entry.unsubscribe(channelName, new BaseRedisPubSubListener() {
@Override
public boolean onStatus(PubSubType type, String channel) {
if (type == PubSubType.UNSUBSCRIBE && channel.equals(channelName)) {
if (entry.release() == 1) {
freePubSubConnections.add(entry);
}
lock.release();
return true;
}
return false;
}
});
}
@Override
public RFuture<Codec> unsubscribe(final String channelName, final PubSubType topicType) {
final RPromise<Codec> result = new RedissonPromise<Codec>();
final AsyncSemaphore lock = getSemaphore(channelName);
lock.acquire(new Runnable() {
@Override
public void run() {
final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null) {
lock.release();
result.trySuccess(null);
return;
}
freePubSubLock.acquire(new Runnable() {
@Override
public void run() {
freePubSubConnections.remove(entry);
freePubSubLock.release();
final Codec entryCodec = entry.getConnection().getChannels().get(channelName);
RedisPubSubListener<Object> listener = new BaseRedisPubSubListener() {
@Override
public boolean onStatus(PubSubType type, String channel) {
if (type == topicType && channel.equals(channelName)) {
lock.release();
result.trySuccess(entryCodec);
return true;
}
return false;
}
};
if (topicType == PubSubType.PUNSUBSCRIBE) {
entry.punsubscribe(channelName, listener);
} else {
entry.unsubscribe(channelName, listener);
}
}
});
}
});
return result;
}
@Override
public void punsubscribe(final String channelName, final AsyncSemaphore lock) {
final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null) {
lock.release();
return;
}
entry.punsubscribe(channelName, new BaseRedisPubSubListener() {
@Override
public boolean onStatus(PubSubType type, String channel) {
if (type == PubSubType.PUNSUBSCRIBE && channel.equals(channelName)) {
if (entry.release() == 1) {
freePubSubConnections.add(entry);
}
lock.release();
return true;
}
return false;
}
});
}
@Override
public MasterSlaveEntry getEntry(InetSocketAddress address) {
@ -903,24 +588,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return entry.connectionReadOp(command);
}
RFuture<RedisPubSubConnection> nextPubSubConnection(int slot) {
MasterSlaveEntry entry = getEntry(slot);
if (entry == null) {
RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for slot: " + slot + " hasn't been discovered yet");
return RedissonPromise.newFailedFuture(ex);
}
return entry.nextPubSubConnection();
}
protected void releaseSubscribeConnection(int slot, PubSubConnectionEntry pubSubEntry) {
MasterSlaveEntry entry = getEntry(slot);
if (entry == null) {
log.error("Node for slot: " + slot + " can't be found");
} else {
entry.returnPubSubConnection(pubSubEntry);
}
}
@Override
public void releaseWrite(NodeSource source, RedisConnection connection) {
MasterSlaveEntry entry = getEntry(source);
@ -989,11 +656,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return group.isTerminated();
}
@Override
public Collection<RedisClientEntry> getClients() {
return Collections.unmodifiableCollection(clientEntries.values());
}
@Override
public EventLoopGroup getGroup() {
return group;
@ -1035,6 +697,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
group.shutdownGracefully().syncUninterruptibly();
}
public PublishSubscribeService getSubscribeService() {
return subscribeService;
}
public ExecutorService getExecutor() {
return executor;
}

@ -17,6 +17,7 @@ package org.redisson.connection;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
@ -29,12 +30,10 @@ import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.cluster.ClusterConnectionManager;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReadMode;
@ -77,6 +76,8 @@ public class MasterSlaveEntry {
final AtomicBoolean active = new AtomicBoolean(true);
String sslHostname;
public MasterSlaveEntry(Set<ClusterSlotRange> slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config) {
for (ClusterSlotRange clusterSlotRange : slotRanges) {
for (int i = clusterSlotRange.getStartSlot(); i < clusterSlotRange.getEndSlot() + 1; i++) {
@ -89,6 +90,10 @@ public class MasterSlaveEntry {
slaveBalancer = new LoadBalancerManager(config, connectionManager, this);
writeConnectionPool = new MasterConnectionPool(config, connectionManager, this);
pubSubConnectionPool = new MasterPubSubConnectionPool(config, connectionManager, this);
if (connectionManager instanceof ClusterConnectionManager) {
sslHostname = ((ClusterConnectionManager) connectionManager).getConfigEndpointHostName();
}
}
public MasterSlaveServersConfig getConfig() {
@ -111,13 +116,13 @@ public class MasterSlaveEntry {
}
public RFuture<RedisClient> setupMasterEntry(InetSocketAddress address, URI uri) {
RedisClient client = connectionManager.createClient(NodeType.MASTER, address, uri);
RedisClient client = connectionManager.createClient(NodeType.MASTER, address, uri, sslHostname);
return setupMasterEntry(client);
}
public RFuture<RedisClient> setupMasterEntry(URI address) {
RedisClient client = connectionManager.createClient(NodeType.MASTER, address);
RedisClient client = connectionManager.createClient(NodeType.MASTER, address, sslHostname);
return setupMasterEntry(client);
}
@ -164,7 +169,7 @@ public class MasterSlaveEntry {
return false;
}
return slaveDown(entry, freezeReason == FreezeReason.SYSTEM);
return slaveDown(entry);
}
public boolean slaveDown(InetSocketAddress address, FreezeReason freezeReason) {
@ -173,7 +178,7 @@ public class MasterSlaveEntry {
return false;
}
return slaveDown(entry, freezeReason == FreezeReason.SYSTEM);
return slaveDown(entry);
}
public boolean slaveDown(URI address, FreezeReason freezeReason) {
@ -182,10 +187,10 @@ public class MasterSlaveEntry {
return false;
}
return slaveDown(entry, freezeReason == FreezeReason.SYSTEM);
return slaveDown(entry);
}
private boolean slaveDown(ClientConnectionsEntry entry, boolean temporaryDown) {
private boolean slaveDown(ClientConnectionsEntry entry) {
// add master as slave if no more slaves available
if (!config.checkSkipSlavesInit() && slaveBalancer.getAvailableClients() == 0) {
if (slaveBalancer.unfreeze(masterEntry.getClient().getAddr(), FreezeReason.SYSTEM)) {
@ -198,7 +203,7 @@ public class MasterSlaveEntry {
closeConnections(entry);
for (RedisPubSubConnection connection : entry.getAllSubscribeConnections()) {
reattachPubSub(connection, temporaryDown);
connectionManager.getSubscribeService().reattachPubSub(connection);
}
entry.getAllSubscribeConnections().clear();
@ -231,79 +236,6 @@ public class MasterSlaveEntry {
}
}
private void reattachPubSub(RedisPubSubConnection redisPubSubConnection, boolean temporaryDown) {
for (String channelName : redisPubSubConnection.getChannels().keySet()) {
PubSubConnectionEntry pubSubEntry = connectionManager.getPubSubEntry(channelName);
Collection<RedisPubSubListener<?>> listeners = pubSubEntry.getListeners(channelName);
reattachPubSubListeners(channelName, listeners, PubSubType.UNSUBSCRIBE);
}
for (String channelName : redisPubSubConnection.getPatternChannels().keySet()) {
PubSubConnectionEntry pubSubEntry = connectionManager.getPubSubEntry(channelName);
Collection<RedisPubSubListener<?>> listeners = pubSubEntry.getListeners(channelName);
reattachPubSubListeners(channelName, listeners, PubSubType.PUNSUBSCRIBE);
}
}
private void reattachPubSubListeners(final String channelName, final Collection<RedisPubSubListener<?>> listeners, final PubSubType topicType) {
RFuture<Codec> subscribeCodec = connectionManager.unsubscribe(channelName, topicType);
if (listeners.isEmpty()) {
return;
}
subscribeCodec.addListener(new FutureListener<Codec>() {
@Override
public void operationComplete(Future<Codec> future) throws Exception {
if (future.get() == null) {
return;
}
Codec subscribeCodec = future.get();
if (topicType == PubSubType.PUNSUBSCRIBE) {
psubscribe(channelName, listeners, subscribeCodec);
} else {
subscribe(channelName, listeners, subscribeCodec);
}
}
});
}
private void subscribe(final String channelName, final Collection<RedisPubSubListener<?>> listeners,
final Codec subscribeCodec) {
RFuture<PubSubConnectionEntry> subscribeFuture = connectionManager.subscribe(subscribeCodec, channelName, listeners.toArray(new RedisPubSubListener[listeners.size()]));
subscribeFuture.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
public void operationComplete(Future<PubSubConnectionEntry> future)
throws Exception {
if (!future.isSuccess()) {
subscribe(channelName, listeners, subscribeCodec);
return;
}
log.info("listeners of '{}' channel to '{}' have been resubscribed", channelName, future.getNow().getConnection().getRedisClient());
}
});
}
private void psubscribe(final String channelName, final Collection<RedisPubSubListener<?>> listeners,
final Codec subscribeCodec) {
RFuture<PubSubConnectionEntry> subscribeFuture = connectionManager.psubscribe(channelName, subscribeCodec, listeners.toArray(new RedisPubSubListener[listeners.size()]));
subscribeFuture.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
public void operationComplete(Future<PubSubConnectionEntry> future)
throws Exception {
if (!future.isSuccess()) {
psubscribe(channelName, listeners, subscribeCodec);
return;
}
log.debug("resubscribed listeners for '{}' channel-pattern to '{}'", channelName, future.getNow().getConnection().getRedisClient());
}
});
}
private void reattachBlockingQueue(RedisConnection connection) {
final CommandData<?, ?> commandData = connection.getCurrentCommand();
@ -404,15 +336,25 @@ public class MasterSlaveEntry {
}
private RFuture<Void> addSlave(InetSocketAddress address, URI uri, final boolean freezed, final NodeType nodeType) {
RedisClient client = connectionManager.createClient(NodeType.SLAVE, address, uri);
RedisClient client = connectionManager.createClient(NodeType.SLAVE, address, uri, sslHostname);
return addSlave(client, freezed, nodeType);
}
private RFuture<Void> addSlave(URI address, final boolean freezed, final NodeType nodeType) {
RedisClient client = connectionManager.createClient(NodeType.SLAVE, address);
RedisClient client = connectionManager.createClient(NodeType.SLAVE, address, sslHostname);
return addSlave(client, freezed, nodeType);
}
public Collection<ClientConnectionsEntry> getSlaveEntries() {
List<ClientConnectionsEntry> result = new ArrayList<ClientConnectionsEntry>();
for (ClientConnectionsEntry slaveEntry : slaveBalancer.getEntries()) {
if (slaveEntry.getNodeType() == NodeType.SLAVE) {
result.add(slaveEntry);
}
}
return result;
}
public RedisClient getClient() {
return masterEntry.getClient();
}
@ -503,7 +445,7 @@ public class MasterSlaveEntry {
pubSubConnectionPool.remove(oldMaster);
oldMaster.freezeMaster(FreezeReason.MANAGER);
slaveDown(oldMaster, false);
slaveDown(oldMaster);
slaveBalancer.changeType(oldMaster.getClient(), NodeType.SLAVE);
slaveBalancer.changeType(newMasterClient, NodeType.MASTER);
@ -566,7 +508,7 @@ public class MasterSlaveEntry {
return slaveBalancer.getConnection(command, addr);
}
RFuture<RedisPubSubConnection> nextPubSubConnection() {
public RFuture<RedisPubSubConnection> nextPubSubConnection() {
if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
return pubSubConnectionPool.get();
}

@ -67,7 +67,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
initTimer(this.config);
for (URI addr : cfg.getNodeAddresses()) {
RFuture<RedisConnection> connectionFuture = connectToNode(cfg, addr, null);
RFuture<RedisConnection> connectionFuture = connectToNode(cfg, addr, null, addr.getHost());
connectionFuture.awaitUninterruptibly();
RedisConnection connection = connectionFuture.getNow();
if (connection == null) {
@ -119,7 +119,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
return;
}
RFuture<RedisConnection> connectionFuture = connectToNode(cfg, addr, null);
RFuture<RedisConnection> connectionFuture = connectToNode(cfg, addr, null, addr.getHost());
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> future) throws Exception {

@ -90,7 +90,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
this.sentinelResolver = resolverGroup.getResolver(getGroup().next());
for (URI addr : cfg.getSentinelAddresses()) {
RedisClient client = createClient(NodeType.SENTINEL, addr, this.config.getConnectTimeout(), this.config.getRetryInterval() * this.config.getRetryAttempts());
RedisClient client = createClient(NodeType.SENTINEL, addr, this.config.getConnectTimeout(), this.config.getRetryInterval() * this.config.getRetryAttempts(), null);
try {
RedisConnection connection = client.connect();
if (!connection.isActive()) {
@ -261,7 +261,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
RedisClient client = iterator.next();
RFuture<RedisConnection> connectionFuture = connectToNode(null, null, client);
RFuture<RedisConnection> connectionFuture = connectToNode(null, null, client, null);
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> future) throws Exception {
@ -433,7 +433,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
return RedissonPromise.newSucceededFuture(null);
}
client = createClient(NodeType.SENTINEL, addr, c.getConnectTimeout(), c.getRetryInterval() * c.getRetryAttempts());
client = createClient(NodeType.SENTINEL, addr, c.getConnectTimeout(), c.getRetryInterval() * c.getRetryAttempts(), null);
RedisClient oldClient = sentinels.putIfAbsent(key, client);
if (oldClient != null) {
return RedissonPromise.newSucceededFuture(null);

@ -17,6 +17,8 @@ package org.redisson.connection.balancer;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import org.redisson.api.NodeType;
@ -98,6 +100,10 @@ public class LoadBalancerManager {
return result;
}
public Collection<ClientConnectionsEntry> getEntries() {
return Collections.unmodifiableCollection(client2Entry.values());
}
public int getAvailableClients() {
int count = 0;
for (ClientConnectionsEntry connectionEntry : client2Entry.values()) {
@ -154,12 +160,6 @@ public class LoadBalancerManager {
return freeze(connectionEntry, freezeReason);
}
public ClientConnectionsEntry freeze(RedisClient redisClient, FreezeReason freezeReason) {
ClientConnectionsEntry connectionEntry = getEntry(redisClient);
return freeze(connectionEntry, freezeReason);
}
public ClientConnectionsEntry freeze(ClientConnectionsEntry connectionEntry, FreezeReason freezeReason) {
if (connectionEntry == null) {
return null;

@ -17,34 +17,44 @@ package org.redisson.misc;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.regex.Pattern;
/**
*
* @author Rui Gu (https://github.com/jackygurui)
*/
public class URIBuilder {
private static final Pattern ipv4Pattern = Pattern.compile("(([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.){3}([01]?\\d\\d?|2[0-4]\\d|25[0-5])", Pattern.CASE_INSENSITIVE);
private static final Pattern ipv6Pattern = Pattern.compile("([0-9a-f]{1,4}:){7}([0-9a-f]){1,4}", Pattern.CASE_INSENSITIVE);
public static URI create(String uri) {
URI u = URI.create(uri);
//Let's assuming most of the time it is OK.
// Let's assuming most of the time it is OK.
if (u.getHost() != null) {
return u;
}
String s = uri.substring(0, uri.lastIndexOf(":"))
.replaceFirst("redis://", "")
.replaceFirst("rediss://", "");
//Assuming this is an IPv6 format, other situations will be handled by
//Netty at a later stage.
String s = uri.substring(0, uri.lastIndexOf(":")).replaceFirst("redis://", "").replaceFirst("rediss://", "");
// Assuming this is an IPv6 format, other situations will be handled by
// Netty at a later stage.
return URI.create(uri.replace(s, "[" + s + "]"));
}
public static boolean isValidIP(String host) {
if (ipv4Pattern.matcher(host).matches()) {
return true;
}
return ipv6Pattern.matcher(host).matches();
}
public static boolean compare(InetSocketAddress entryAddr, URI addr) {
if (((entryAddr.getHostName() != null && entryAddr.getHostName().equals(addr.getHost()))
|| entryAddr.getAddress().getHostAddress().equals(addr.getHost()))
&& entryAddr.getPort() == addr.getPort()) {
&& entryAddr.getPort() == addr.getPort()) {
return true;
}
return false;
}
}

@ -40,8 +40,8 @@ abstract class PublishSubscribe<E extends PubSubEntry<E>> {
private final ConcurrentMap<String, E> entries = PlatformDependent.newConcurrentHashMap();
public void unsubscribe(final E entry, final String entryName, final String channelName, final ConnectionManager connectionManager) {
final AsyncSemaphore semaphore = connectionManager.getSemaphore(channelName);
public void unsubscribe(final E entry, final String entryName, final String channelName, final PublishSubscribeService subscribeService) {
final AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName);
semaphore.acquire(new Runnable() {
@Override
public void run() {
@ -51,7 +51,7 @@ abstract class PublishSubscribe<E extends PubSubEntry<E>> {
if (!removed) {
throw new IllegalStateException();
}
connectionManager.unsubscribe(channelName, semaphore);
subscribeService.unsubscribe(channelName, semaphore);
} else {
semaphore.release();
}
@ -64,9 +64,9 @@ abstract class PublishSubscribe<E extends PubSubEntry<E>> {
return entries.get(entryName);
}
public RFuture<E> subscribe(final String entryName, final String channelName, final ConnectionManager connectionManager) {
public RFuture<E> subscribe(final String entryName, final String channelName, final PublishSubscribeService subscribeService) {
final AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();
final AsyncSemaphore semaphore = connectionManager.getSemaphore(channelName);
final AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName);
final RPromise<E> newPromise = new RedissonPromise<E>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
@ -98,7 +98,7 @@ abstract class PublishSubscribe<E extends PubSubEntry<E>> {
}
RedisPubSubListener<Object> listener = createListener(channelName, value);
connectionManager.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
subscribeService.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
}
};
semaphore.acquire(listener);

@ -0,0 +1,463 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.pubsub;
import java.util.Collection;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RFuture;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.RedisNodeNotFoundException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.SubscribeListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent;
/**
*
* @author Nikita Koksharov
*
*/
public class PublishSubscribeService {
private static final Logger log = LoggerFactory.getLogger(PublishSubscribeService.class);
private final ConnectionManager connectionManager;
private final MasterSlaveServersConfig config;
private final AsyncSemaphore[] locks = new AsyncSemaphore[50];
private final AsyncSemaphore freePubSubLock = new AsyncSemaphore(1);
protected final ConcurrentMap<String, PubSubConnectionEntry> name2PubSubConnection = PlatformDependent.newConcurrentHashMap();
protected final Queue<PubSubConnectionEntry> freePubSubConnections = new ConcurrentLinkedQueue<PubSubConnectionEntry>();
public PublishSubscribeService(ConnectionManager connectionManager, MasterSlaveServersConfig config) {
super();
this.connectionManager = connectionManager;
this.config = config;
for (int i = 0; i < locks.length; i++) {
locks[i] = new AsyncSemaphore(1);
}
}
public PubSubConnectionEntry getPubSubEntry(String channelName) {
return name2PubSubConnection.get(channelName);
}
public RFuture<PubSubConnectionEntry> psubscribe(String channelName, Codec codec, RedisPubSubListener<?>... listeners) {
return subscribe(PubSubType.PSUBSCRIBE, codec, channelName, new RedissonPromise<PubSubConnectionEntry>(), listeners);
}
public RFuture<PubSubConnectionEntry> psubscribe(String channelName, Codec codec, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) {
RPromise<PubSubConnectionEntry> promise = new RedissonPromise<PubSubConnectionEntry>();
subscribe(codec, channelName, promise, PubSubType.PSUBSCRIBE, semaphore, listeners);
return promise;
}
public RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, RedisPubSubListener<?>... listeners) {
return subscribe(PubSubType.SUBSCRIBE, codec, channelName, new RedissonPromise<PubSubConnectionEntry>(), listeners);
}
private RFuture<PubSubConnectionEntry> subscribe(final PubSubType type, final Codec codec, final String channelName,
final RPromise<PubSubConnectionEntry> promise, final RedisPubSubListener<?>... listeners) {
final AsyncSemaphore lock = getSemaphore(channelName);
lock.acquire(new Runnable() {
@Override
public void run() {
if (promise.isDone()) {
lock.release();
return;
}
final RPromise<PubSubConnectionEntry> result = new RedissonPromise<PubSubConnectionEntry>();
result.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
public void operationComplete(Future<PubSubConnectionEntry> future) throws Exception {
if (!future.isSuccess()) {
subscribe(type, codec, channelName, promise, listeners);
return;
}
promise.trySuccess(result.getNow());
}
});
subscribe(codec, channelName, result, type, lock, listeners);
}
});
return promise;
}
public RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) {
RPromise<PubSubConnectionEntry> promise = new RedissonPromise<PubSubConnectionEntry>();
subscribe(codec, channelName, promise, PubSubType.SUBSCRIBE, semaphore, listeners);
return promise;
}
public AsyncSemaphore getSemaphore(String channelName) {
return locks[Math.abs(channelName.hashCode() % locks.length)];
}
private void subscribe(final Codec codec, final String channelName,
final RPromise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock, final RedisPubSubListener<?>... listeners) {
final PubSubConnectionEntry connEntry = name2PubSubConnection.get(channelName);
if (connEntry != null) {
subscribe(channelName, promise, type, lock, connEntry, listeners);
return;
}
freePubSubLock.acquire(new Runnable() {
@Override
public void run() {
if (promise.isDone()) {
lock.release();
freePubSubLock.release();
return;
}
final PubSubConnectionEntry freeEntry = freePubSubConnections.peek();
if (freeEntry == null) {
connect(codec, channelName, promise, type, lock, listeners);
return;
}
int remainFreeAmount = freeEntry.tryAcquire();
if (remainFreeAmount == -1) {
throw new IllegalStateException();
}
final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry);
if (oldEntry != null) {
freeEntry.release();
freePubSubLock.release();
subscribe(channelName, promise, type, lock, oldEntry, listeners);
return;
}
if (remainFreeAmount == 0) {
freePubSubConnections.poll();
}
freePubSubLock.release();
subscribe(channelName, promise, type, lock, freeEntry, listeners);
if (PubSubType.PSUBSCRIBE == type) {
freeEntry.psubscribe(codec, channelName);
} else {
freeEntry.subscribe(codec, channelName);
}
}
});
}
private void subscribe(final String channelName, final RPromise<PubSubConnectionEntry> promise,
final PubSubType type, final AsyncSemaphore lock, final PubSubConnectionEntry connEntry,
final RedisPubSubListener<?>... listeners) {
for (RedisPubSubListener<?> listener : listeners) {
connEntry.addListener(channelName, listener);
}
SubscribeListener listener = connEntry.getSubscribeFuture(channelName, type);
final Future<Void> subscribeFuture = listener.getSuccessFuture();
subscribeFuture.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!promise.trySuccess(connEntry)) {
for (RedisPubSubListener<?> listener : listeners) {
connEntry.removeListener(channelName, listener);
}
if (!connEntry.hasListeners(channelName)) {
unsubscribe(channelName, lock);
} else {
lock.release();
}
} else {
lock.release();
}
}
});
connectionManager.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (promise.tryFailure(new RedisTimeoutException())) {
subscribeFuture.cancel(false);
}
}
}, config.getRetryInterval(), TimeUnit.MILLISECONDS);
}
private void releaseSubscribeConnection(int slot, PubSubConnectionEntry pubSubEntry) {
MasterSlaveEntry entry = connectionManager.getEntry(slot);
if (entry == null) {
log.error("Node for slot: " + slot + " can't be found");
} else {
entry.returnPubSubConnection(pubSubEntry);
}
}
private RFuture<RedisPubSubConnection> nextPubSubConnection(int slot) {
MasterSlaveEntry entry = connectionManager.getEntry(slot);
if (entry == null) {
RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for slot: " + slot + " hasn't been discovered yet");
return RedissonPromise.newFailedFuture(ex);
}
return entry.nextPubSubConnection();
}
private void connect(final Codec codec, final String channelName,
final RPromise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock, final RedisPubSubListener<?>... listeners) {
final int slot = connectionManager.calcSlot(channelName);
RFuture<RedisPubSubConnection> connFuture = nextPubSubConnection(slot);
connFuture.addListener(new FutureListener<RedisPubSubConnection>() {
@Override
public void operationComplete(Future<RedisPubSubConnection> future) throws Exception {
if (!future.isSuccess()) {
freePubSubLock.release();
lock.release();
promise.tryFailure(future.cause());
return;
}
RedisPubSubConnection conn = future.getNow();
final PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
entry.tryAcquire();
final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
releaseSubscribeConnection(slot, entry);
freePubSubLock.release();
subscribe(channelName, promise, type, lock, oldEntry, listeners);
return;
}
freePubSubConnections.add(entry);
freePubSubLock.release();
subscribe(channelName, promise, type, lock, entry, listeners);
if (PubSubType.PSUBSCRIBE == type) {
entry.psubscribe(codec, channelName);
} else {
entry.subscribe(codec, channelName);
}
}
});
}
public void unsubscribe(final String channelName, final AsyncSemaphore lock) {
final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null) {
lock.release();
return;
}
entry.unsubscribe(channelName, new BaseRedisPubSubListener() {
@Override
public boolean onStatus(PubSubType type, String channel) {
if (type == PubSubType.UNSUBSCRIBE && channel.equals(channelName)) {
if (entry.release() == 1) {
freePubSubConnections.add(entry);
}
lock.release();
return true;
}
return false;
}
});
}
public RFuture<Codec> unsubscribe(final String channelName, final PubSubType topicType) {
final RPromise<Codec> result = new RedissonPromise<Codec>();
final AsyncSemaphore lock = getSemaphore(channelName);
lock.acquire(new Runnable() {
@Override
public void run() {
final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null) {
lock.release();
result.trySuccess(null);
return;
}
freePubSubLock.acquire(new Runnable() {
@Override
public void run() {
freePubSubConnections.remove(entry);
freePubSubLock.release();
final Codec entryCodec = entry.getConnection().getChannels().get(channelName);
RedisPubSubListener<Object> listener = new BaseRedisPubSubListener() {
@Override
public boolean onStatus(PubSubType type, String channel) {
if (type == topicType && channel.equals(channelName)) {
lock.release();
result.trySuccess(entryCodec);
return true;
}
return false;
}
};
if (topicType == PubSubType.PUNSUBSCRIBE) {
entry.punsubscribe(channelName, listener);
} else {
entry.unsubscribe(channelName, listener);
}
}
});
}
});
return result;
}
public void punsubscribe(final String channelName, final AsyncSemaphore lock) {
final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null) {
lock.release();
return;
}
entry.punsubscribe(channelName, new BaseRedisPubSubListener() {
@Override
public boolean onStatus(PubSubType type, String channel) {
if (type == PubSubType.PUNSUBSCRIBE && channel.equals(channelName)) {
if (entry.release() == 1) {
freePubSubConnections.add(entry);
}
lock.release();
return true;
}
return false;
}
});
}
public void reattachPubSub(RedisPubSubConnection redisPubSubConnection) {
for (String channelName : redisPubSubConnection.getChannels().keySet()) {
PubSubConnectionEntry pubSubEntry = getPubSubEntry(channelName);
Collection<RedisPubSubListener<?>> listeners = pubSubEntry.getListeners(channelName);
reattachPubSubListeners(channelName, listeners, PubSubType.UNSUBSCRIBE);
}
for (String channelName : redisPubSubConnection.getPatternChannels().keySet()) {
PubSubConnectionEntry pubSubEntry = getPubSubEntry(channelName);
Collection<RedisPubSubListener<?>> listeners = pubSubEntry.getListeners(channelName);
reattachPubSubListeners(channelName, listeners, PubSubType.PUNSUBSCRIBE);
}
}
private void reattachPubSubListeners(final String channelName, final Collection<RedisPubSubListener<?>> listeners, final PubSubType topicType) {
RFuture<Codec> subscribeCodec = unsubscribe(channelName, topicType);
if (listeners.isEmpty()) {
return;
}
subscribeCodec.addListener(new FutureListener<Codec>() {
@Override
public void operationComplete(Future<Codec> future) throws Exception {
if (future.get() == null) {
return;
}
Codec subscribeCodec = future.get();
if (topicType == PubSubType.PUNSUBSCRIBE) {
psubscribe(channelName, listeners, subscribeCodec);
} else {
subscribe(channelName, listeners, subscribeCodec);
}
}
});
}
private void subscribe(final String channelName, final Collection<RedisPubSubListener<?>> listeners,
final Codec subscribeCodec) {
RFuture<PubSubConnectionEntry> subscribeFuture = subscribe(subscribeCodec, channelName, listeners.toArray(new RedisPubSubListener[listeners.size()]));
subscribeFuture.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
public void operationComplete(Future<PubSubConnectionEntry> future)
throws Exception {
if (!future.isSuccess()) {
subscribe(channelName, listeners, subscribeCodec);
return;
}
log.info("listeners of '{}' channel to '{}' have been resubscribed", channelName, future.getNow().getConnection().getRedisClient());
}
});
}
private void psubscribe(final String channelName, final Collection<RedisPubSubListener<?>> listeners,
final Codec subscribeCodec) {
RFuture<PubSubConnectionEntry> subscribeFuture = psubscribe(channelName, subscribeCodec, listeners.toArray(new RedisPubSubListener[listeners.size()]));
subscribeFuture.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
public void operationComplete(Future<PubSubConnectionEntry> future)
throws Exception {
if (!future.isSuccess()) {
psubscribe(channelName, listeners, subscribeCodec);
return;
}
log.debug("resubscribed listeners for '{}' channel-pattern to '{}'", channelName, future.getNow().getConnection().getRedisClient());
}
});
}
}

@ -35,6 +35,7 @@ import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.pubsub.AsyncSemaphore;
import org.redisson.pubsub.PublishSubscribeService;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
@ -48,6 +49,7 @@ import io.netty.util.concurrent.FutureListener;
*/
public class RedissonPatternTopicReactive<M> implements RPatternTopicReactive<M> {
final PublishSubscribeService subscribeService;
final CommandReactiveExecutor commandExecutor;
private final String name;
private final Codec codec;
@ -60,6 +62,7 @@ public class RedissonPatternTopicReactive<M> implements RPatternTopicReactive<M>
this.commandExecutor = commandExecutor;
this.name = name;
this.codec = codec;
this.subscribeService = commandExecutor.getConnectionManager().getSubscribeService();
}
@Override
@ -88,7 +91,7 @@ public class RedissonPatternTopicReactive<M> implements RPatternTopicReactive<M>
}
private void addListener(final RedisPubSubListener<M> pubSubListener, final RPromise<Integer> promise) {
RFuture<PubSubConnectionEntry> future = commandExecutor.getConnectionManager().psubscribe(name, codec, pubSubListener);
RFuture<PubSubConnectionEntry> future = subscribeService.psubscribe(name, codec, pubSubListener);
future.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
public void operationComplete(Future<PubSubConnectionEntry> future) throws Exception {
@ -112,10 +115,10 @@ public class RedissonPatternTopicReactive<M> implements RPatternTopicReactive<M>
@Override
public void removeListener(int listenerId) {
AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
AsyncSemaphore semaphore = subscribeService.getSemaphore(name);
acquire(semaphore);
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
PubSubConnectionEntry entry = subscribeService.getPubSubEntry(name);
if (entry == null) {
semaphore.release();
return;
@ -123,7 +126,7 @@ public class RedissonPatternTopicReactive<M> implements RPatternTopicReactive<M>
entry.removeListener(name, listenerId);
if (!entry.hasListeners(name)) {
commandExecutor.getConnectionManager().punsubscribe(name, semaphore);
subscribeService.punsubscribe(name, semaphore);
} else {
semaphore.release();
}

@ -11,12 +11,13 @@ import org.redisson.RedisRunner;
import org.redisson.RedisRunner.RedisProcess;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.client.WriteRedisConnectionException;
import org.redisson.config.Config;
import org.redisson.config.ReadMode;
public class WeightedRoundRobinBalancerTest {
@Test
@Test(expected = WriteRedisConnectionException.class)
public void testUseMasterForReadsIfNoConnectionsToSlaves() throws IOException, InterruptedException {
RedisProcess master = null;
RedisProcess slave = null;

Loading…
Cancel
Save