Elasticache Replication Group server configuration

Uses "INFO replication" to determine which node is master and which are
slaves.
pull/244/head
Steve Ungerer 10 years ago
parent 57973aba0c
commit 3c5346f97e

@ -33,6 +33,8 @@ public class Config {
private SingleServerConfig singleServerConfig;
private ClusterServersConfig clusterServersConfig;
private ElasticacheReplicationGroupServersConfig elasticacheReplicationGroupServersConfig;
/**
* Threads amount shared between all redis node clients
@ -71,6 +73,10 @@ public class Config {
if (oldConf.getClusterServersConfig() != null ) {
setClusterServersConfig(new ClusterServersConfig(oldConf.getClusterServersConfig()));
}
if (oldConf.getElasticacheReplicationGroupServersConfig() != null) {
setElasticacheReplicationGroupServersConfig(new ElasticacheReplicationGroupServersConfig(oldConf.getElasticacheReplicationGroupServersConfig()));
}
}
/**
@ -91,6 +97,7 @@ public class Config {
checkMasterSlaveServersConfig();
checkSentinelServersConfig();
checkSingleServerConfig();
checkElasticacheReplicationGroupServersConfig();
if (clusterServersConfig == null) {
clusterServersConfig = new ClusterServersConfig();
@ -105,17 +112,37 @@ public class Config {
this.clusterServersConfig = clusterServersConfig;
}
public ElasticacheReplicationGroupServersConfig useElasticacheReplicationGroupServers() {
checkClusterServersConfig();
checkMasterSlaveServersConfig();
checkSentinelServersConfig();
checkSingleServerConfig();
if (elasticacheReplicationGroupServersConfig == null) {
elasticacheReplicationGroupServersConfig = new ElasticacheReplicationGroupServersConfig();
}
return elasticacheReplicationGroupServersConfig;
}
ElasticacheReplicationGroupServersConfig getElasticacheReplicationGroupServersConfig() {
return elasticacheReplicationGroupServersConfig;
}
void setElasticacheReplicationGroupServersConfig(ElasticacheReplicationGroupServersConfig elasticacheReplicationGroupServersConfig) {
this.elasticacheReplicationGroupServersConfig = elasticacheReplicationGroupServersConfig;
}
public SingleServerConfig useSingleServer() {
checkClusterServersConfig();
checkMasterSlaveServersConfig();
checkSentinelServersConfig();
checkElasticacheReplicationGroupServersConfig();
if (singleServerConfig == null) {
singleServerConfig = new SingleServerConfig();
}
return singleServerConfig;
}
SingleServerConfig getSingleServerConfig() {
return singleServerConfig;
}
@ -127,6 +154,7 @@ public class Config {
checkClusterServersConfig();
checkSingleServerConfig();
checkMasterSlaveServersConfig();
checkElasticacheReplicationGroupServersConfig();
if (sentinelServersConfig == null) {
sentinelServersConfig = new SentinelServersConfig();
@ -145,6 +173,7 @@ public class Config {
checkClusterServersConfig();
checkSingleServerConfig();
checkSentinelServersConfig();
checkElasticacheReplicationGroupServersConfig();
if (masterSlaveServersConfig == null) {
masterSlaveServersConfig = new MasterSlaveServersConfig();
@ -194,6 +223,12 @@ public class Config {
throw new IllegalStateException("single server config already used!");
}
}
private void checkElasticacheReplicationGroupServersConfig() {
if (elasticacheReplicationGroupServersConfig != null) {
throw new IllegalStateException("elasticache replication group servers config already used!");
}
}
/**
* Activates an unix socket if servers binded to loopback interface.

@ -0,0 +1,86 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import org.redisson.misc.URIBuilder;
/**
* Configuration for an AWS ElastiCache replication group. A replication group is composed
* of a single master endpoint and multiple read slaves.
*
* @author Steve Ungerer
*/
public class ElasticacheReplicationGroupServersConfig extends BaseMasterSlaveServersConfig<ElasticacheReplicationGroupServersConfig> {
/**
* Replication group node urls list
*/
private List<URI> nodeAddresses = new ArrayList<URI>();
/**
* Replication group scan interval in milliseconds
*/
private int scanInterval = 1000;
public ElasticacheReplicationGroupServersConfig() {
}
ElasticacheReplicationGroupServersConfig(ElasticacheReplicationGroupServersConfig config) {
super(config);
setNodeAddresses(config.getNodeAddresses());
setScanInterval(config.getScanInterval());
}
/**
* Add Redis cluster node address. Use follow format -- <code>host:port</code>
*
* @param addresses in <code>host:port</code> format
* @return
*/
public ElasticacheReplicationGroupServersConfig addNodeAddress(String ... addresses) {
for (String address : addresses) {
nodeAddresses.add(URIBuilder.create(address));
}
return this;
}
public List<URI> getNodeAddresses() {
return nodeAddresses;
}
void setNodeAddresses(List<URI> nodeAddresses) {
this.nodeAddresses = nodeAddresses;
}
public int getScanInterval() {
return scanInterval;
}
/**
* Redis cluster scan interval in milliseconds
*
* @param scanInterval in milliseconds
* @return
*/
public ElasticacheReplicationGroupServersConfig setScanInterval(int scanInterval) {
this.scanInterval = scanInterval;
return this;
}
}

@ -71,7 +71,10 @@ public class MasterSlaveServersConfig extends BaseMasterSlaveServersConfig<Maste
}
return this;
}
public MasterSlaveServersConfig addSlaveAddress(URI slaveAddress) {
slaveAddresses.add(slaveAddress);
return this;
}
public List<URI> getSlaveAddresses() {
return slaveAddresses;
}

@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ClusterConnectionManager;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.ElasticacheReplicationGroupConnectionManager;
import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.SentinelConnectionManager;
import org.redisson.connection.SingleConnectionManager;
@ -78,6 +79,8 @@ public class Redisson implements RedissonClient {
connectionManager = new SentinelConnectionManager(configCopy.getSentinelServersConfig(), configCopy);
} else if (configCopy.getClusterServersConfig() != null) {
connectionManager = new ClusterConnectionManager(configCopy.getClusterServersConfig(), configCopy);
} else if (configCopy.getElasticacheReplicationGroupServersConfig() != null) {
connectionManager = new ElasticacheReplicationGroupConnectionManager(configCopy.getElasticacheReplicationGroupServersConfig(), configCopy);
} else {
throw new IllegalArgumentException("server(s) address(es) not defined!");
}

@ -173,4 +173,5 @@ public interface RedisCommands {
RedisStrictCommand<List<String>> SENTINEL_GET_MASTER_ADDR_BY_NAME = new RedisStrictCommand<List<String>>("SENTINEL", "GET-MASTER-ADDR-BY-NAME", new StringListReplayDecoder());
RedisStrictCommand<List<Map<String, String>>> SENTINEL_SLAVES = new RedisStrictCommand<List<Map<String, String>>>("SENTINEL", "SLAVES", new StringMapReplayDecoder());
RedisStrictCommand<String> INFO_REPLICATION = new RedisStrictCommand<String>("INFO", "replication", new StringDataDecoder());
}

@ -0,0 +1,164 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.connection;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.Config;
import org.redisson.ElasticacheReplicationGroupServersConfig;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.protocol.RedisCommands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.ScheduledFuture;
public class ElasticacheReplicationGroupConnectionManager extends MasterSlaveConnectionManager {
private static final String ROLE_KEY = "role:";
private final Logger log = LoggerFactory.getLogger(getClass());
private AtomicReference<URI> currentMaster = new AtomicReference<URI>();
private final Map<URI, RedisConnection> nodeConnections = new HashMap<URI, RedisConnection>();
private ScheduledFuture<?> monitorFuture;
private enum Role {
master,
slave
}
public ElasticacheReplicationGroupConnectionManager(ElasticacheReplicationGroupServersConfig cfg, Config config) {
init(config);
this.config = create(cfg);
for (URI addr : cfg.getNodeAddresses()) {
RedisConnection connection = connect(cfg, addr);
if (connection == null) {
continue;
}
Role role = determineRole(connection.sync(RedisCommands.INFO_REPLICATION));
if (Role.master.equals(role)) {
currentMaster.set(addr);
log.info("{} is the master", addr);
this.config.setMasterAddress(addr);
} else {
this.config.addSlaveAddress(addr);
}
}
init(this.config);
monitorRoleChange(cfg);
}
private RedisConnection connect(ElasticacheReplicationGroupServersConfig cfg, URI addr) {
RedisConnection connection = nodeConnections.get(addr);
if (connection != null) {
return connection;
}
RedisClient client = createClient(addr.getHost(), addr.getPort(), cfg.getTimeout());
try {
connection = client.connect();
nodeConnections.put(addr, connection);
} catch (RedisConnectionException e) {
log.warn(e.getMessage(), e);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
return connection;
}
private void monitorRoleChange(final ElasticacheReplicationGroupServersConfig cfg) {
monitorFuture = GlobalEventExecutor.INSTANCE.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
URI master = currentMaster.get();
for (URI addr : cfg.getNodeAddresses()) {
RedisConnection connection = connect(cfg, addr);
String replInfo = connection.sync(RedisCommands.INFO_REPLICATION);
log.trace("{} repl info: {}", addr, replInfo);
Role role = determineRole(replInfo);
log.debug("Current master: {} / node {} is {}", master, addr, role);
if (Role.master.equals(role) && master.equals(addr)) {
log.debug("Current master {} unchanged", master);
} else if (Role.master.equals(role) && !master.equals(addr) && currentMaster.compareAndSet(master, addr)) {
log.info("Master has changed from {} to {}", master, addr);
changeMaster(MAX_SLOT, addr.getHost(), addr.getPort());
slaveDown(MAX_SLOT, master.getHost(), master.getPort());
break;
}
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}, cfg.getScanInterval(), cfg.getScanInterval(), TimeUnit.MILLISECONDS);
}
private Role determineRole(String data) {
String[] lines = data.split("\\r\\n");
for (String s : lines) {
if (s.startsWith(ROLE_KEY)) {
return Role.valueOf(s.substring(ROLE_KEY.length()));
}
}
throw new RuntimeException("Cannot determine role from " + data);
}
private MasterSlaveServersConfig create(ElasticacheReplicationGroupServersConfig cfg) {
MasterSlaveServersConfig c = new MasterSlaveServersConfig();
c.setRetryInterval(cfg.getRetryInterval());
c.setRetryAttempts(cfg.getRetryAttempts());
c.setTimeout(cfg.getTimeout());
c.setPingTimeout(cfg.getPingTimeout());
c.setLoadBalancer(cfg.getLoadBalancer());
c.setPassword(cfg.getPassword());
c.setDatabase(cfg.getDatabase());
c.setClientName(cfg.getClientName());
c.setMasterConnectionPoolSize(cfg.getMasterConnectionPoolSize());
c.setSlaveConnectionPoolSize(cfg.getSlaveConnectionPoolSize());
c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize());
c.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection());
return c;
}
@Override
public void shutdown() {
monitorFuture.cancel(true);
super.shutdown();
for (RedisConnection connection : nodeConnections.values()) {
connection.getRedisClient().shutdown();
}
}
}
Loading…
Cancel
Save