Merge branch 'master' into 3.0.0

pull/985/head^2
Nikita 8 years ago
commit e97ebdeb35

@ -236,7 +236,7 @@ public interface RKeys extends RKeysAsync {
* <p> * <p>
* Requires Redis 4.0+ * Requires Redis 4.0+
* *
* @param keys * @param keys of objects
* @return number of removed keys * @return number of removed keys
*/ */
long unlink(String ... keys); long unlink(String ... keys);

@ -175,6 +175,9 @@ public class RedisClient {
this.commandTimeout = config.getCommandTimeout(); this.commandTimeout = config.getCommandTimeout();
} }
public String getIpAddr() {
return addr.getAddress().getHostAddress() + ":" + addr.getPort();
}
public InetSocketAddress getAddr() { public InetSocketAddress getAddr() {
return addr; return addr;

@ -39,6 +39,11 @@ import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
/**
*
* @author Nikita Koksharov
*
*/
public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
private final Logger log = LoggerFactory.getLogger(getClass()); private final Logger log = LoggerFactory.getLogger(getClass());

@ -82,7 +82,6 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
this.config = create(cfg); this.config = create(cfg);
initTimer(this.config); initTimer(this.config);
init(this.config);
Throwable lastException = null; Throwable lastException = null;
List<String> failedMasters = new ArrayList<String>(); List<String> failedMasters = new ArrayList<String>();
@ -196,10 +195,6 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
return result; return result;
} }
@Override
protected void initEntry(MasterSlaveServersConfig config) {
}
private RFuture<Collection<RFuture<Void>>> addMasterEntry(final ClusterPartition partition, final ClusterServersConfig cfg) { private RFuture<Collection<RFuture<Void>>> addMasterEntry(final ClusterPartition partition, final ClusterServersConfig cfg) {
if (partition.isMasterFail()) { if (partition.isMasterFail()) {
RedisException e = new RedisException("Failed to add master: " + RedisException e = new RedisException("Failed to add master: " +
@ -251,7 +246,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
final MasterSlaveEntry e; final MasterSlaveEntry e;
List<RFuture<Void>> futures = new ArrayList<RFuture<Void>>(); List<RFuture<Void>> futures = new ArrayList<RFuture<Void>>();
if (config.getReadMode() == ReadMode.MASTER) { if (config.checkSkipSlavesInit()) {
e = new SingleEntry(partition.getSlotRanges(), ClusterConnectionManager.this, config); e = new SingleEntry(partition.getSlotRanges(), ClusterConnectionManager.this, config);
} else { } else {
config.setSlaveAddresses(partition.getSlaveAddresses()); config.setSlaveAddresses(partition.getSlaveAddresses());
@ -508,8 +503,6 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
ClusterPartition newMasterPart = find(newPartitions, slot); ClusterPartition newMasterPart = find(newPartitions, slot);
// does partition has a new master? // does partition has a new master?
if (!newMasterPart.getMasterAddress().equals(currentPart.getMasterAddress())) { if (!newMasterPart.getMasterAddress().equals(currentPart.getMasterAddress())) {
log.info("changing master from {} to {} for {}",
currentPart.getMasterAddress(), newMasterPart.getMasterAddress(), slot);
URI newUri = newMasterPart.getMasterAddress(); URI newUri = newMasterPart.getMasterAddress();
URI oldUri = currentPart.getMasterAddress(); URI oldUri = currentPart.getMasterAddress();

@ -65,6 +65,8 @@ public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig
*/ */
private int subscriptionConnectionPoolSize = 50; private int subscriptionConnectionPoolSize = 50;
private long dnsMonitoringInterval = 5000;
public BaseMasterSlaveServersConfig() { public BaseMasterSlaveServersConfig() {
} }
@ -79,6 +81,7 @@ public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig
setSubscriptionConnectionMinimumIdleSize(config.getSubscriptionConnectionMinimumIdleSize()); setSubscriptionConnectionMinimumIdleSize(config.getSubscriptionConnectionMinimumIdleSize());
setReadMode(config.getReadMode()); setReadMode(config.getReadMode());
setSubscriptionMode(config.getSubscriptionMode()); setSubscriptionMode(config.getSubscriptionMode());
setDnsMonitoringInterval(config.getDnsMonitoringInterval());
} }
/** /**
@ -257,6 +260,10 @@ public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig
return readMode; return readMode;
} }
public boolean checkSkipSlavesInit() {
return getReadMode() == ReadMode.MASTER && getSubscriptionMode() == SubscriptionMode.MASTER;
}
/** /**
* Set node type used for subscription operation. * Set node type used for subscription operation.
* <p> * <p>
@ -273,5 +280,22 @@ public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig
return subscriptionMode; return subscriptionMode;
} }
/**
* Interval in milliseconds to check the endpoint's DNS<p>
* Applications must ensure the JVM DNS cache TTL is low enough to support this.<p>
* Set <code>-1</code> to disable.
* <p>
* Default is <code>5000</code>.
*
* @param dnsMonitoringInterval time
* @return config
*/
public T setDnsMonitoringInterval(long dnsMonitoringInterval) {
this.dnsMonitoringInterval = dnsMonitoringInterval;
return (T) this;
}
public long getDnsMonitoringInterval() {
return dnsMonitoringInterval;
}
} }

@ -54,17 +54,17 @@ public class ClientConnectionsEntry {
private FreezeReason freezeReason; private FreezeReason freezeReason;
final RedisClient client; final RedisClient client;
private final NodeType nodeType; private NodeType nodeType;
private ConnectionManager connectionManager; private ConnectionManager connectionManager;
private final AtomicInteger failedAttempts = new AtomicInteger(); private final AtomicInteger failedAttempts = new AtomicInteger();
public ClientConnectionsEntry(RedisClient client, int poolMinSize, int poolMaxSize, int subscribePoolMinSize, int subscribePoolMaxSize, public ClientConnectionsEntry(RedisClient client, int poolMinSize, int poolMaxSize, int subscribePoolMinSize, int subscribePoolMaxSize,
ConnectionManager connectionManager, NodeType serverMode) { ConnectionManager connectionManager, NodeType nodeType) {
this.client = client; this.client = client;
this.freeConnectionsCounter = new AsyncSemaphore(poolMaxSize); this.freeConnectionsCounter = new AsyncSemaphore(poolMaxSize);
this.connectionManager = connectionManager; this.connectionManager = connectionManager;
this.nodeType = serverMode; this.nodeType = nodeType;
this.freeSubscribeConnectionsCounter = new AsyncSemaphore(subscribePoolMaxSize); this.freeSubscribeConnectionsCounter = new AsyncSemaphore(subscribePoolMaxSize);
if (subscribePoolMaxSize > 0) { if (subscribePoolMaxSize > 0) {
@ -73,6 +73,9 @@ public class ClientConnectionsEntry {
connectionManager.getConnectionWatcher().add(poolMinSize, poolMaxSize, freeConnections, freeConnectionsCounter); connectionManager.getConnectionWatcher().add(poolMinSize, poolMaxSize, freeConnections, freeConnectionsCounter);
} }
public void setNodeType(NodeType nodeType) {
this.nodeType = nodeType;
}
public NodeType getNodeType() { public NodeType getNodeType() {
return nodeType; return nodeType;
} }

@ -0,0 +1,139 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.connection;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.redisson.client.RedisConnectionException;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.URIBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.ScheduledFuture;
/**
* DNS changes monitor.
*
* @author Nikita Koksharov
*
*/
public class DNSMonitor {
private static final Logger log = LoggerFactory.getLogger(DNSMonitor.class);
private ScheduledFuture<?> dnsMonitorFuture;
private ConnectionManager connectionManager;
private final Map<URI, InetAddress> masters = new HashMap<URI, InetAddress>();
private final Map<URI, InetAddress> slaves = new HashMap<URI, InetAddress>();
private long dnsMonitoringInterval;
public DNSMonitor(ConnectionManager connectionManager, Set<URI> masterHosts, Set<URI> slaveHosts, long dnsMonitoringInterval) {
for (URI host : masterHosts) {
try {
masters.put(host, InetAddress.getByName(host.getHost()));
} catch (UnknownHostException e) {
throw new RedisConnectionException("Unknown host: " + host.getHost(), e);
}
}
for (URI host : slaveHosts) {
try {
slaves.put(host, InetAddress.getByName(host.getHost()));
} catch (UnknownHostException e) {
throw new RedisConnectionException("Unknown host: " + host.getHost(), e);
}
}
this.connectionManager = connectionManager;
this.dnsMonitoringInterval = dnsMonitoringInterval;
}
public void start() {
monitorDnsChange();
log.debug("DNS monitoring enabled; Current masters: {}, slaves: {}", masters, slaves);
}
public void stop() {
if (dnsMonitorFuture != null) {
dnsMonitorFuture.cancel(true);
}
}
private void monitorDnsChange() {
dnsMonitorFuture = GlobalEventExecutor.INSTANCE.schedule(new Runnable() {
@Override
public void run() {
// As InetAddress.getByName call is blocking. Method should be run in dedicated thread
connectionManager.getExecutor().execute(new Runnable() {
@Override
public void run() {
try {
for (Entry<URI, InetAddress> entry : masters.entrySet()) {
InetAddress master = entry.getValue();
InetAddress now = InetAddress.getByName(entry.getKey().getHost());
if (!now.getHostAddress().equals(master.getHostAddress())) {
log.info("Detected DNS change. {} has changed from {} to {}", entry.getKey().getHost(), master.getHostAddress(), now.getHostAddress());
for (MasterSlaveEntry entrySet : connectionManager.getEntrySet()) {
if (entrySet.getClient().getAddr().getHostName().equals(entry.getKey().getHost())
&& entrySet.getClient().getAddr().getPort() == entry.getKey().getPort()) {
entrySet.changeMaster(entry.getKey());
}
}
masters.put(entry.getKey(), now);
log.info("Master {} has been changed", entry.getKey().getHost());
}
}
for (Entry<URI, InetAddress> entry : slaves.entrySet()) {
InetAddress slave = entry.getValue();
InetAddress updatedSlave = InetAddress.getByName(entry.getKey().getHost());
if (!updatedSlave.getHostAddress().equals(slave.getHostAddress())) {
log.info("Detected DNS change. {} has changed from {} to {}", entry.getKey().getHost(), slave.getHostAddress(), updatedSlave.getHostAddress());
for (MasterSlaveEntry masterSlaveEntry : connectionManager.getEntrySet()) {
URI uri = URIBuilder.create("redis://" + slave.getHostAddress() + ":" + entry.getKey().getPort());
if (masterSlaveEntry.slaveDown(uri, FreezeReason.MANAGER)) {
masterSlaveEntry.slaveUp(entry.getKey(), FreezeReason.MANAGER);
}
}
slaves.put(entry.getKey(), updatedSlave);
log.info("Slave {} has been changed", entry.getKey().getHost());
}
}
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
monitorDnsChange();
}
}
});
}
}, dnsMonitoringInterval, TimeUnit.MILLISECONDS);
}
}

@ -51,8 +51,6 @@ import org.redisson.command.CommandSyncService;
import org.redisson.config.BaseMasterSlaveServersConfig; import org.redisson.config.BaseMasterSlaveServersConfig;
import org.redisson.config.Config; import org.redisson.config.Config;
import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReadMode;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.InfinitySemaphoreLatch; import org.redisson.misc.InfinitySemaphoreLatch;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise; import org.redisson.misc.RedissonPromise;
@ -128,6 +126,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected final Queue<PubSubConnectionEntry> freePubSubConnections = new ConcurrentLinkedQueue<PubSubConnectionEntry>(); protected final Queue<PubSubConnectionEntry> freePubSubConnections = new ConcurrentLinkedQueue<PubSubConnectionEntry>();
protected DNSMonitor dnsMonitor;
protected MasterSlaveServersConfig config; protected MasterSlaveServersConfig config;
private final Map<Integer, MasterSlaveEntry> entries = PlatformDependent.newConcurrentHashMap(); private final Map<Integer, MasterSlaveEntry> entries = PlatformDependent.newConcurrentHashMap();
@ -161,7 +161,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config) { public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config) {
this(config); this(config);
initTimer(cfg); initTimer(cfg);
init(cfg); this.config = cfg;
initSingleEntry();
} }
public MasterSlaveConnectionManager(Config cfg) { public MasterSlaveConnectionManager(Config cfg) {
@ -237,19 +238,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return new HashSet<MasterSlaveEntry>(entries.values()); return new HashSet<MasterSlaveEntry>(entries.values());
} }
protected void init(MasterSlaveServersConfig config) {
this.config = config;
connectionWatcher = new IdleConnectionWatcher(this, config);
try {
initEntry(config);
} catch (RuntimeException e) {
stopThreads();
throw e;
}
}
protected void initTimer(MasterSlaveServersConfig config) { protected void initTimer(MasterSlaveServersConfig config) {
int[] timeouts = new int[]{config.getRetryInterval(), config.getTimeout(), config.getReconnectionTimeout()}; int[] timeouts = new int[]{config.getRetryInterval(), config.getTimeout(), config.getReconnectionTimeout()};
Arrays.sort(timeouts); Arrays.sort(timeouts);
@ -273,23 +261,35 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
connectionWatcher = new IdleConnectionWatcher(this, config);
} }
protected void initEntry(MasterSlaveServersConfig config) { protected void initSingleEntry() {
HashSet<ClusterSlotRange> slots = new HashSet<ClusterSlotRange>(); try {
slots.add(singleSlotRange); HashSet<ClusterSlotRange> slots = new HashSet<ClusterSlotRange>();
slots.add(singleSlotRange);
MasterSlaveEntry entry;
if (config.checkSkipSlavesInit()) {
entry = new SingleEntry(slots, this, config);
RFuture<Void> f = entry.setupMasterEntry(config.getMasterAddress());
f.syncUninterruptibly();
} else {
entry = createMasterSlaveEntry(config, slots);
}
MasterSlaveEntry entry; for (int slot = singleSlotRange.getStartSlot(); slot < singleSlotRange.getEndSlot() + 1; slot++) {
if (config.getReadMode() == ReadMode.MASTER) { addEntry(slot, entry);
entry = new SingleEntry(slots, this, config); }
RFuture<Void> f = entry.setupMasterEntry(config.getMasterAddress());
f.syncUninterruptibly();
} else {
entry = createMasterSlaveEntry(config, slots);
}
for (int slot = singleSlotRange.getStartSlot(); slot < singleSlotRange.getEndSlot() + 1; slot++) { if (config.getDnsMonitoringInterval() != -1) {
addEntry(slot, entry); dnsMonitor = new DNSMonitor(this, Collections.singleton(config.getMasterAddress()),
config.getSlaveAddresses(), config.getDnsMonitoringInterval());
dnsMonitor.start();
}
} catch (RuntimeException e) {
stopThreads();
throw e;
} }
} }
@ -349,7 +349,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override @Override
public void shutdownAsync(RedisClient client) { public void shutdownAsync(RedisClient client) {
clientEntries.remove(client); if (clientEntries.remove(client) == null) {
log.error("Can't find client {}", client);
}
client.shutdownAsync(); client.shutdownAsync();
} }
@ -775,6 +777,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override @Override
public void shutdown(long quietPeriod, long timeout, TimeUnit unit) { public void shutdown(long quietPeriod, long timeout, TimeUnit unit) {
if (dnsMonitor != null) {
dnsMonitor.stop();
}
shutdownLatch.close(); shutdownLatch.close();
shutdownPromise.trySuccess(true); shutdownPromise.trySuccess(true);
shutdownLatch.awaitUninterruptibly(); shutdownLatch.awaitUninterruptibly();

@ -42,6 +42,7 @@ import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.balancer.LoadBalancerManager; import org.redisson.connection.balancer.LoadBalancerManager;
import org.redisson.connection.pool.MasterConnectionPool; import org.redisson.connection.pool.MasterConnectionPool;
import org.redisson.connection.pool.MasterPubSubConnectionPool; import org.redisson.connection.pool.MasterPubSubConnectionPool;
import org.redisson.misc.URIBuilder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -86,9 +87,13 @@ public class MasterSlaveEntry {
pubSubConnectionHolder = new MasterPubSubConnectionPool(config, connectionManager, this); pubSubConnectionHolder = new MasterPubSubConnectionPool(config, connectionManager, this);
} }
public MasterSlaveServersConfig getConfig() {
return config;
}
public List<RFuture<Void>> initSlaveBalancer(Collection<URI> disconnectedNodes) { public List<RFuture<Void>> initSlaveBalancer(Collection<URI> disconnectedNodes) {
boolean freezeMasterAsSlave = !config.getSlaveAddresses().isEmpty() boolean freezeMasterAsSlave = !config.getSlaveAddresses().isEmpty()
&& config.getReadMode() == ReadMode.SLAVE && !config.checkSkipSlavesInit()
&& disconnectedNodes.size() < config.getSlaveAddresses().size(); && disconnectedNodes.size() < config.getSlaveAddresses().size();
List<RFuture<Void>> result = new LinkedList<RFuture<Void>>(); List<RFuture<Void>> result = new LinkedList<RFuture<Void>>();
@ -120,15 +125,6 @@ public class MasterSlaveEntry {
return writeConnectionHolder.add(masterEntry); return writeConnectionHolder.add(masterEntry);
} }
private boolean slaveDown(ClientConnectionsEntry entry, FreezeReason freezeReason) {
ClientConnectionsEntry e = slaveBalancer.freeze(entry, freezeReason);
if (e == null) {
return false;
}
return slaveDown(e, freezeReason == FreezeReason.SYSTEM);
}
public boolean slaveDown(URI address, FreezeReason freezeReason) { public boolean slaveDown(URI address, FreezeReason freezeReason) {
ClientConnectionsEntry entry = slaveBalancer.freeze(address, freezeReason); ClientConnectionsEntry entry = slaveBalancer.freeze(address, freezeReason);
if (entry == null) { if (entry == null) {
@ -140,7 +136,7 @@ public class MasterSlaveEntry {
private boolean slaveDown(ClientConnectionsEntry entry, boolean temporaryDown) { private boolean slaveDown(ClientConnectionsEntry entry, boolean temporaryDown) {
// add master as slave if no more slaves available // add master as slave if no more slaves available
if (config.getReadMode() == ReadMode.SLAVE && slaveBalancer.getAvailableClients() == 0) { if (!config.checkSkipSlavesInit() && slaveBalancer.getAvailableClients() == 0) {
URI addr = masterEntry.getClient().getConfig().getAddress(); URI addr = masterEntry.getClient().getConfig().getAddress();
if (slaveUp(addr, FreezeReason.SYSTEM)) { if (slaveUp(addr, FreezeReason.SYSTEM)) {
log.info("master {} used as slave", addr); log.info("master {} used as slave", addr);
@ -318,13 +314,13 @@ public class MasterSlaveEntry {
return addSlave(address, true, NodeType.SLAVE); return addSlave(address, true, NodeType.SLAVE);
} }
private RFuture<Void> addSlave(URI address, boolean freezed, NodeType mode) { private RFuture<Void> addSlave(URI address, boolean freezed, NodeType nodeType) {
RedisClient client = connectionManager.createClient(NodeType.SLAVE, address); RedisClient client = connectionManager.createClient(NodeType.SLAVE, address);
ClientConnectionsEntry entry = new ClientConnectionsEntry(client, ClientConnectionsEntry entry = new ClientConnectionsEntry(client,
this.config.getSlaveConnectionMinimumIdleSize(), this.config.getSlaveConnectionMinimumIdleSize(),
this.config.getSlaveConnectionPoolSize(), this.config.getSlaveConnectionPoolSize(),
this.config.getSubscriptionConnectionMinimumIdleSize(), this.config.getSubscriptionConnectionMinimumIdleSize(),
this.config.getSubscriptionConnectionPoolSize(), connectionManager, mode); this.config.getSubscriptionConnectionPoolSize(), connectionManager, nodeType);
if (freezed) { if (freezed) {
synchronized (entry) { synchronized (entry) {
entry.setFreezed(freezed); entry.setFreezed(freezed);
@ -346,7 +342,7 @@ public class MasterSlaveEntry {
InetSocketAddress naddress = new InetSocketAddress(address.getHost(), address.getPort()); InetSocketAddress naddress = new InetSocketAddress(address.getHost(), address.getPort());
InetSocketAddress addr = masterEntry.getClient().getAddr(); InetSocketAddress addr = masterEntry.getClient().getAddr();
// exclude master from slaves // exclude master from slaves
if (config.getReadMode() == ReadMode.SLAVE if (!config.checkSkipSlavesInit()
&& (!addr.getAddress().getHostAddress().equals(naddress.getAddress().getHostAddress()) || naddress.getPort() != addr.getPort())) { && (!addr.getAddress().getHostAddress().equals(naddress.getAddress().getHostAddress()) || naddress.getPort() != addr.getPort())) {
slaveDown(address, FreezeReason.SYSTEM); slaveDown(address, FreezeReason.SYSTEM);
log.info("master {} excluded from slaves", addr); log.info("master {} excluded from slaves", addr);
@ -369,14 +365,21 @@ public class MasterSlaveEntry {
public void operationComplete(Future<Void> future) throws Exception { public void operationComplete(Future<Void> future) throws Exception {
writeConnectionHolder.remove(oldMaster); writeConnectionHolder.remove(oldMaster);
pubSubConnectionHolder.remove(oldMaster); pubSubConnectionHolder.remove(oldMaster);
slaveDown(oldMaster, FreezeReason.MANAGER);
oldMaster.freezeMaster(FreezeReason.MANAGER);
slaveDown(oldMaster, false);
slaveDown(URIBuilder.create("redis://" + oldMaster.getClient().getIpAddr()), FreezeReason.MANAGER);
slaveBalancer.changeType(oldMaster.getClient().getAddr(), NodeType.SLAVE);
slaveBalancer.changeType(address, NodeType.MASTER);
// more than one slave available, so master can be removed from slaves // more than one slave available, so master can be removed from slaves
if (config.getReadMode() == ReadMode.SLAVE if (!config.checkSkipSlavesInit()
&& slaveBalancer.getAvailableClients() > 1) { && slaveBalancer.getAvailableClients() > 1) {
slaveDown(address, FreezeReason.SYSTEM); slaveDown(address, FreezeReason.SYSTEM);
} }
connectionManager.shutdownAsync(oldMaster.getClient()); connectionManager.shutdownAsync(oldMaster.getClient());
log.info("master {} has changed to {}", oldMaster.getClient().getAddr(), address);
} }
}); });
} }
@ -415,10 +418,16 @@ public class MasterSlaveEntry {
} }
public RFuture<RedisConnection> connectionReadOp(RedisCommand<?> command) { public RFuture<RedisConnection> connectionReadOp(RedisCommand<?> command) {
if (config.getReadMode() == ReadMode.MASTER) {
return connectionWriteOp(command);
}
return slaveBalancer.nextConnection(command); return slaveBalancer.nextConnection(command);
} }
public RFuture<RedisConnection> connectionReadOp(RedisCommand<?> command, InetSocketAddress addr) { public RFuture<RedisConnection> connectionReadOp(RedisCommand<?> command, InetSocketAddress addr) {
if (config.getReadMode() == ReadMode.MASTER) {
return connectionWriteOp(command);
}
return slaveBalancer.getConnection(command, addr); return slaveBalancer.getConnection(command, addr);
} }
@ -443,6 +452,10 @@ public class MasterSlaveEntry {
} }
public void releaseRead(RedisConnection connection) { public void releaseRead(RedisConnection connection) {
if (config.getReadMode() == ReadMode.MASTER) {
releaseWrite(connection);
return;
}
slaveBalancer.returnConnection(connection); slaveBalancer.returnConnection(connection);
} }

@ -101,7 +101,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
throw new RedisConnectionException("Can't connect to servers!"); throw new RedisConnectionException("Can't connect to servers!");
} }
init(this.config); initSingleEntry();
scheduleMasterChangeCheck(cfg); scheduleMasterChangeCheck(cfg);
} }
@ -192,7 +192,6 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
if (master.equals(addr)) { if (master.equals(addr)) {
log.debug("Current master {} unchanged", master); log.debug("Current master {} unchanged", master);
} else if (currentMaster.compareAndSet(master, addr)) { } else if (currentMaster.compareAndSet(master, addr)) {
log.info("Master has changed from {} to {}", master, addr);
changeMaster(singleSlotRange.getStartSlot(), addr); changeMaster(singleSlotRange.getStartSlot(), addr);
} }
} }

@ -39,16 +39,15 @@ import org.redisson.cluster.ClusterSlotRange;
import org.redisson.config.BaseMasterSlaveServersConfig; import org.redisson.config.BaseMasterSlaveServersConfig;
import org.redisson.config.Config; import org.redisson.config.Config;
import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReadMode;
import org.redisson.config.SentinelServersConfig; import org.redisson.config.SentinelServersConfig;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.URIBuilder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import org.redisson.misc.URIBuilder;
/** /**
* *
@ -68,13 +67,13 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
public SentinelConnectionManager(SentinelServersConfig cfg, Config config) { public SentinelConnectionManager(SentinelServersConfig cfg, Config config) {
super(config); super(config);
this.config = create(cfg);
initTimer(this.config);
if (cfg.getMasterName() == null) { if (cfg.getMasterName() == null) {
throw new IllegalArgumentException("masterName parameter is not defined!"); throw new IllegalArgumentException("masterName parameter is not defined!");
} }
this.config = create(cfg);
initTimer(this.config);
for (URI addr : cfg.getSentinelAddresses()) { for (URI addr : cfg.getSentinelAddresses()) {
RedisClient client = createClient(NodeType.SENTINEL, addr, this.config.getConnectTimeout(), this.config.getRetryInterval() * this.config.getRetryAttempts()); RedisClient client = createClient(NodeType.SENTINEL, addr, this.config.getConnectTimeout(), this.config.getRetryInterval() * this.config.getRetryAttempts());
try { try {
@ -126,7 +125,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
stopThreads(); stopThreads();
throw new RedisConnectionException("Can't connect to servers!"); throw new RedisConnectionException("Can't connect to servers!");
} }
init(this.config);
initSingleEntry();
List<RFuture<RedisPubSubConnection>> connectionFutures = new ArrayList<RFuture<RedisPubSubConnection>>(cfg.getSentinelAddresses().size()); List<RFuture<RedisPubSubConnection>> connectionFutures = new ArrayList<RFuture<RedisPubSubConnection>>(cfg.getSentinelAddresses().size());
for (URI addr : cfg.getSentinelAddresses()) { for (URI addr : cfg.getSentinelAddresses()) {
@ -240,7 +240,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
} }
// to avoid addition twice // to avoid addition twice
if (slaves.putIfAbsent(slaveAddr, true) == null) { if (slaves.putIfAbsent(slaveAddr, true) == null && !config.checkSkipSlavesInit()) {
final MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); final MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
RFuture<Void> future = entry.addSlave(URIBuilder.create(slaveAddr)); RFuture<Void> future = entry.addSlave(URIBuilder.create(slaveAddr));
future.addListener(new FutureListener<Void>() { future.addListener(new FutureListener<Void>() {
@ -312,7 +312,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
} }
private void slaveDown(String ip, String port) { private void slaveDown(String ip, String port) {
if (config.getReadMode() == ReadMode.MASTER) { if (config.checkSkipSlavesInit()) {
log.warn("slave: {}:{} has down", ip, port); log.warn("slave: {}:{} has down", ip, port);
} else { } else {
MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
@ -369,7 +369,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
} }
private void slaveUp(String ip, String port) { private void slaveUp(String ip, String port) {
if (config.getReadMode() == ReadMode.MASTER) { if (config.checkSkipSlavesInit()) {
String slaveAddr = ip + ":" + port; String slaveAddr = ip + ":" + port;
log.info("slave: {} has up", slaveAddr); log.info("slave: {} has up", slaveAddr);
return; return;
@ -395,7 +395,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
if (!newMaster.equals(current) if (!newMaster.equals(current)
&& currentMaster.compareAndSet(current, newMaster)) { && currentMaster.compareAndSet(current, newMaster)) {
changeMaster(singleSlotRange.getStartSlot(), URIBuilder.create(newMaster)); changeMaster(singleSlotRange.getStartSlot(), URIBuilder.create(newMaster));
log.info("master {} changed to {}", current, newMaster);
} }
} }
} else { } else {

@ -15,22 +15,11 @@
*/ */
package org.redisson.connection; package org.redisson.connection;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.client.RedisConnectionException;
import org.redisson.config.Config; import org.redisson.config.Config;
import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReadMode; import org.redisson.config.ReadMode;
import org.redisson.config.SingleServerConfig; import org.redisson.config.SingleServerConfig;
import org.redisson.config.SubscriptionMode; import org.redisson.config.SubscriptionMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.ScheduledFuture;
/** /**
* *
@ -39,24 +28,8 @@ import io.netty.util.concurrent.ScheduledFuture;
*/ */
public class SingleConnectionManager extends MasterSlaveConnectionManager { public class SingleConnectionManager extends MasterSlaveConnectionManager {
private final Logger log = LoggerFactory.getLogger(getClass());
private final AtomicReference<InetAddress> currentMaster = new AtomicReference<InetAddress>();
private ScheduledFuture<?> monitorFuture;
public SingleConnectionManager(SingleServerConfig cfg, Config config) { public SingleConnectionManager(SingleServerConfig cfg, Config config) {
super(create(cfg), config); super(create(cfg), config);
if (cfg.isDnsMonitoring()) {
try {
this.currentMaster.set(InetAddress.getByName(cfg.getAddress().getHost()));
} catch (UnknownHostException e) {
throw new RedisConnectionException("Unknown host: " + cfg.getAddress().getHost(), e);
}
log.debug("DNS monitoring enabled; Current master set to {}", currentMaster.get());
monitorDnsChange(cfg);
}
} }
private static MasterSlaveServersConfig create(SingleServerConfig cfg) { private static MasterSlaveServersConfig create(SingleServerConfig cfg) {
@ -84,6 +57,11 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager {
newconfig.setIdleConnectionTimeout(cfg.getIdleConnectionTimeout()); newconfig.setIdleConnectionTimeout(cfg.getIdleConnectionTimeout());
newconfig.setFailedAttempts(cfg.getFailedAttempts()); newconfig.setFailedAttempts(cfg.getFailedAttempts());
newconfig.setReconnectionTimeout(cfg.getReconnectionTimeout()); newconfig.setReconnectionTimeout(cfg.getReconnectionTimeout());
if (cfg.isDnsMonitoring()) {
newconfig.setDnsMonitoringInterval(cfg.getDnsMonitoringInterval());
} else {
newconfig.setDnsMonitoringInterval(-1);
}
newconfig.setMasterConnectionMinimumIdleSize(cfg.getConnectionMinimumIdleSize()); newconfig.setMasterConnectionMinimumIdleSize(cfg.getConnectionMinimumIdleSize());
newconfig.setSubscriptionConnectionMinimumIdleSize(cfg.getSubscriptionConnectionMinimumIdleSize()); newconfig.setSubscriptionConnectionMinimumIdleSize(cfg.getSubscriptionConnectionMinimumIdleSize());
@ -92,41 +70,4 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager {
return newconfig; return newconfig;
} }
private void monitorDnsChange(final SingleServerConfig cfg) {
monitorFuture = GlobalEventExecutor.INSTANCE.schedule(new Runnable() {
@Override
public void run() {
// As InetAddress.getByName call is blocking. Method should be run in dedicated thread
getExecutor().execute(new Runnable() {
@Override
public void run() {
try {
InetAddress master = currentMaster.get();
InetAddress now = InetAddress.getByName(cfg.getAddress().getHost());
if (!now.getHostAddress().equals(master.getHostAddress())) {
log.info("Detected DNS change. {} has changed from {} to {}", cfg.getAddress().getHost(), master.getHostAddress(), now.getHostAddress());
if (currentMaster.compareAndSet(master, now)) {
changeMaster(singleSlotRange.getStartSlot(), cfg.getAddress());
log.info("Master has been changed");
}
}
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
monitorDnsChange(cfg);
}
}
});
}
}, cfg.getDnsMonitoringInterval(), TimeUnit.MILLISECONDS);
}
@Override
public void shutdown() {
if (monitorFuture != null) {
monitorFuture.cancel(true);
}
super.shutdown();
}
} }

@ -20,12 +20,14 @@ import java.net.URI;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.api.NodeType;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReadMode;
import org.redisson.connection.ClientConnectionsEntry; import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
@ -40,6 +42,11 @@ import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
/**
*
* @author Nikita Koksharov
*
*/
public class LoadBalancerManager { public class LoadBalancerManager {
private final Logger log = LoggerFactory.getLogger(getClass()); private final Logger log = LoggerFactory.getLogger(getClass());
@ -56,6 +63,25 @@ public class LoadBalancerManager {
pubSubConnectionPool = new PubSubConnectionPool(config, connectionManager, entry); pubSubConnectionPool = new PubSubConnectionPool(config, connectionManager, entry);
} }
public void changeType(InetSocketAddress addr, NodeType nodeType) {
ClientConnectionsEntry entry = ip2Entry.get(addr.getAddress().getHostAddress() + ":" + addr.getPort());
changeType(addr, nodeType, entry);
}
protected void changeType(Object addr, NodeType nodeType, ClientConnectionsEntry entry) {
if (entry != null) {
if (connectionManager.isClusterMode()) {
entry.getClient().getConfig().setReadOnly(nodeType == NodeType.SLAVE && connectionManager.getConfig().getReadMode() != ReadMode.MASTER);
}
entry.setNodeType(nodeType);
}
}
public void changeType(URI address, NodeType nodeType) {
ClientConnectionsEntry entry = getEntry(address);
changeType(address, nodeType, entry);
}
public RFuture<Void> add(final ClientConnectionsEntry entry) { public RFuture<Void> add(final ClientConnectionsEntry entry) {
final RPromise<Void> result = connectionManager.newPromise(); final RPromise<Void> result = connectionManager.newPromise();
FutureListener<Void> listener = new FutureListener<Void>() { FutureListener<Void> listener = new FutureListener<Void>() {
@ -67,7 +93,7 @@ public class LoadBalancerManager {
return; return;
} }
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {
String addr = convert(entry.getClient().getConfig().getAddress()); String addr = entry.getClient().getIpAddr();
ip2Entry.put(addr, entry); ip2Entry.put(addr, entry);
result.trySuccess(null); result.trySuccess(null);
} }
@ -123,7 +149,7 @@ public class LoadBalancerManager {
return freeze(connectionEntry, freezeReason); return freeze(connectionEntry, freezeReason);
} }
protected ClientConnectionsEntry getEntry(URI address) { private ClientConnectionsEntry getEntry(URI address) {
String addr = convert(address); String addr = convert(address);
return ip2Entry.get(addr); return ip2Entry.get(addr);
} }

@ -341,19 +341,23 @@ abstract class ConnectionPool<T extends RedisConnection> {
connectionManager.newTimeout(new TimerTask() { connectionManager.newTimeout(new TimerTask() {
@Override @Override
public void run(Timeout timeout) throws Exception { public void run(Timeout timeout) throws Exception {
if (entry.getFreezeReason() != FreezeReason.RECONNECT synchronized (entry) {
|| !entry.isFreezed() if (entry.getFreezeReason() != FreezeReason.RECONNECT
|| !entry.isFreezed()
|| connectionManager.isShuttingDown()) { || connectionManager.isShuttingDown()) {
return; return;
}
} }
RFuture<RedisConnection> connectionFuture = entry.getClient().connectAsync(); RFuture<RedisConnection> connectionFuture = entry.getClient().connectAsync();
connectionFuture.addListener(new FutureListener<RedisConnection>() { connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override @Override
public void operationComplete(Future<RedisConnection> future) throws Exception { public void operationComplete(Future<RedisConnection> future) throws Exception {
if (entry.getFreezeReason() != FreezeReason.RECONNECT synchronized (entry) {
|| !entry.isFreezed()) { if (entry.getFreezeReason() != FreezeReason.RECONNECT
return; || !entry.isFreezed()) {
return;
}
} }
if (!future.isSuccess()) { if (!future.isSuccess()) {
@ -371,9 +375,11 @@ abstract class ConnectionPool<T extends RedisConnection> {
@Override @Override
public void operationComplete(Future<String> future) throws Exception { public void operationComplete(Future<String> future) throws Exception {
try { try {
if (entry.getFreezeReason() != FreezeReason.RECONNECT synchronized (entry) {
|| !entry.isFreezed()) { if (entry.getFreezeReason() != FreezeReason.RECONNECT
return; || !entry.isFreezed()) {
return;
}
} }
if (future.isSuccess() && "PONG".equals(future.getNow())) { if (future.isSuccess() && "PONG".equals(future.getNow())) {
@ -385,13 +391,13 @@ abstract class ConnectionPool<T extends RedisConnection> {
throws Exception { throws Exception {
if (entry.getNodeType() == NodeType.SLAVE) { if (entry.getNodeType() == NodeType.SLAVE) {
masterSlaveEntry.slaveUp(entry.getClient().getConfig().getAddress(), FreezeReason.RECONNECT); masterSlaveEntry.slaveUp(entry.getClient().getConfig().getAddress(), FreezeReason.RECONNECT);
log.info("slave {} successfully reconnected", entry.getClient().getAddr()); log.info("slave {} has been successfully reconnected", entry.getClient().getAddr());
} else { } else {
synchronized (entry) { synchronized (entry) {
if (entry.getFreezeReason() == FreezeReason.RECONNECT) { if (entry.getFreezeReason() == FreezeReason.RECONNECT) {
entry.setFreezed(false); entry.setFreezed(false);
entry.setFreezeReason(null); entry.setFreezeReason(null);
log.info("host {} successfully reconnected", entry.getClient().getAddr()); log.info("host {} has been successfully reconnected", entry.getClient().getAddr());
} }
} }
} }

@ -32,7 +32,6 @@ import org.redisson.api.NodesGroup;
import org.redisson.api.RMap; import org.redisson.api.RMap;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException;
import org.redisson.client.RedisOutOfMemoryException; import org.redisson.client.RedisOutOfMemoryException;
import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.client.protocol.decoder.ScanObjectEntry;
@ -536,7 +535,7 @@ public class RedissonTest {
@Test(expected = RedisConnectionException.class) @Test(expected = RedisConnectionException.class)
public void testSentinelConnectionFail() throws InterruptedException { public void testSentinelConnectionFail() throws InterruptedException {
Config config = new Config(); Config config = new Config();
config.useSentinelServers().addSentinelAddress("redis://127.99.0.1:1111"); config.useSentinelServers().addSentinelAddress("redis://127.99.0.1:1111").setMasterName("test");
Redisson.create(config); Redisson.create(config);
Thread.sleep(1500); Thread.sleep(1500);

Loading…
Cancel
Save