Sentinel support. #30

pull/38/head
Nikita 11 years ago
parent 7abe00e911
commit 56254719ff

@ -56,6 +56,7 @@ import com.lambdaworks.redis.output.IntegerOutput;
import com.lambdaworks.redis.output.KeyListOutput;
import com.lambdaworks.redis.output.KeyOutput;
import com.lambdaworks.redis.output.KeyValueOutput;
import com.lambdaworks.redis.output.ListMapOutput;
import com.lambdaworks.redis.output.MapKeyListOutput;
import com.lambdaworks.redis.output.MapOutput;
import com.lambdaworks.redis.output.MapValueListOutput;
@ -1006,6 +1007,16 @@ public class RedisAsyncConnection<K, V> extends ChannelInboundHandlerAdapter {
return dispatch(PFADD, new IntegerOutput<K, V>(codec), args);
}
public Future<List<V>> getMasterAddrByKey(K name) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add("get-master-addr-by-name").addKey(name);
return dispatch(SENTINEL, new ValueListOutput<K, V>(codec), args);
}
public Future<List<Map<K, V>>> slaves(K key) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add("slaves").addKey(key);
return dispatch(SENTINEL, new ListMapOutput<K, V>(codec), args);
}
/**
* Wait until commands are complete or the connection timeout is reached.
*

@ -41,6 +41,7 @@ public class RedisClient {
private ChannelGroup channels;
private long timeout;
private TimeUnit unit;
private InetSocketAddress addr;
/**
* Create a new client that connects to the supplied host on the default port.
@ -60,7 +61,7 @@ public class RedisClient {
* @param port Server port.
*/
public RedisClient(EventLoopGroup group, String host, int port) {
InetSocketAddress addr = new InetSocketAddress(host, port);
addr = new InetSocketAddress(host, port);
bootstrap = new Bootstrap().channel(NioSocketChannel.class).group(group).remoteAddress(addr);
@ -179,7 +180,7 @@ public class RedisClient {
return connection;
} catch (Throwable e) {
throw new RedisException("Unable to connect", e);
throw new RedisConnectionException("Unable to connect", e);
}
}
@ -198,6 +199,9 @@ public class RedisClient {
bootstrap.group().shutdownGracefully().syncUninterruptibly();
}
public InetSocketAddress getAddr() {
return addr;
}
}

@ -0,0 +1,11 @@
package com.lambdaworks.redis;
public class RedisConnectionException extends RedisException {
private static final long serialVersionUID = 4007817232147176510L;
public RedisConnectionException(String msg, Throwable e) {
super(msg, e);
}
}

@ -0,0 +1,54 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.output;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
/**
* {@link Map} of keys and values output.
*
* @param <K> Key type.
* @param <V> Value type.
*
* @author Will Glozer
*/
public class ListMapOutput<K, V> extends CommandOutput<K, V, List<Map<K, V>>> {
private K key;
private int index = 0;
public ListMapOutput(RedisCodec<K, V> codec) {
super(codec, new ArrayList<Map<K, V>>());
}
@Override
public void set(ByteBuffer bytes) {
if (key == null) {
key = codec.decodeMapKey(bytes);
return;
}
V value = (bytes == null) ? null : codec.decodeMapValue(bytes);
if (output.isEmpty()) {
output.add(new HashMap<K, V>());
}
Map<K, V> map = output.get(index);
if (map == null) {
map = new HashMap<K, V>();
output.add(map);
}
if (map.get(key) != null) {
index++;
map = new HashMap<K, V>();
output.add(map);
}
map.put(key, value);
key = null;
}
}

@ -71,7 +71,9 @@ public enum CommandType {
BITCOUNT, BITOP, GETBIT, SETBIT,
// HyperLogLog
PFADD, PFCOUNT, PFMERGE;
PFADD, PFCOUNT, PFMERGE,
SENTINEL;
public byte[] bytes;

@ -26,6 +26,8 @@ import org.redisson.codec.RedissonCodec;
*/
public class Config {
private SentinelConnectionConfig sentinelConnectionConfig;
private MasterSlaveConnectionConfig masterSlaveConnectionConfig;
private SingleConnectionConfig singleConnectionConfig;
@ -57,6 +59,9 @@ public class Config {
if (oldConf.getMasterSlaveConnectionConfig() != null) {
setMasterSlaveConnectionConfig(new MasterSlaveConnectionConfig(oldConf.getMasterSlaveConnectionConfig()));
}
if (oldConf.getSentinelConnectionConfig() != null ) {
setSentinelConnectionConfig(new SentinelConnectionConfig(oldConf.getSentinelConnectionConfig()));
}
}
/**
@ -89,6 +94,22 @@ public class Config {
this.singleConnectionConfig = singleConnectionConfig;
}
public SentinelConnectionConfig useSentinelConnection() {
if (singleConnectionConfig != null) {
throw new IllegalStateException("single connection already used!");
}
if (sentinelConnectionConfig == null) {
sentinelConnectionConfig = new SentinelConnectionConfig();
}
return sentinelConnectionConfig;
}
SentinelConnectionConfig getSentinelConnectionConfig() {
return sentinelConnectionConfig;
}
void setSentinelConnectionConfig(SentinelConnectionConfig sentinelConnectionConfig) {
this.sentinelConnectionConfig = sentinelConnectionConfig;
}
public MasterSlaveConnectionConfig useMasterSlaveConnection() {
if (singleConnectionConfig != null) {
throw new IllegalStateException("single connection already used!");

@ -20,6 +20,7 @@ import java.util.concurrent.ConcurrentMap;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.SentinelConnectionManager;
import org.redisson.connection.SingleConnectionManager;
import org.redisson.core.RAtomicLong;
import org.redisson.core.RBucket;
@ -72,8 +73,10 @@ public class Redisson {
Config configCopy = new Config(config);
if (configCopy.getMasterSlaveConnectionConfig() != null) {
connectionManager = new MasterSlaveConnectionManager(configCopy.getMasterSlaveConnectionConfig(), configCopy);
} else {
} else if (configCopy.getSingleConnectionConfig() != null) {
connectionManager = new SingleConnectionManager(configCopy.getSingleConnectionConfig(), configCopy);
} else {
connectionManager = new SentinelConnectionManager(configCopy.getSentinelConnectionConfig(), configCopy);
}
}
@ -86,6 +89,7 @@ public class Redisson {
Config config = new Config();
config.useSingleConnection().setAddress("127.0.0.1:6379");
// config.useMasterSlaveConnection().setMasterAddress("127.0.0.1:6379").addSlaveAddress("127.0.0.1:6389").addSlaveAddress("127.0.0.1:6399");
// config.useSentinelConnection().setMasterName("mymaster").addSentinelAddress("127.0.0.1:26389", "127.0.0.1:26379");
return create(config);
}

@ -0,0 +1,62 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
public class SentinelConnectionConfig {
private List<URI> sentinelAddresses = new ArrayList<URI>();
private String masterName;
public SentinelConnectionConfig() {
}
SentinelConnectionConfig(SentinelConnectionConfig config) {
setSentinelAddresses(config.getSentinelAddresses());
setMasterName(config.getMasterName());
}
public SentinelConnectionConfig setMasterName(String masterName) {
this.masterName = masterName;
return this;
}
public String getMasterName() {
return masterName;
}
public SentinelConnectionConfig addSentinelAddress(String ... addresses) {
for (String address : addresses) {
try {
sentinelAddresses.add(new URI("//" + address));
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Can't parse " + address);
}
}
return this;
}
public List<URI> getSentinelAddresses() {
return sentinelAddresses;
}
void setSentinelAddresses(List<URI> sentinelAddresses) {
this.sentinelAddresses = sentinelAddresses;
}
}

@ -0,0 +1,66 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.codec;
import java.nio.ByteBuffer;
import com.lambdaworks.redis.codec.Utf8StringCodec;
public class StringCodec implements RedissonCodec {
private final Utf8StringCodec codec = new Utf8StringCodec();
@Override
public Object decodeKey(ByteBuffer bytes) {
return codec.decodeKey(bytes);
}
@Override
public Object decodeValue(ByteBuffer bytes) {
return codec.decodeValue(bytes);
}
@Override
public byte[] encodeKey(Object key) {
return codec.encodeKey((String)key);
}
@Override
public byte[] encodeValue(Object value) {
return codec.encodeValue((String)value);
}
@Override
public byte[] encodeMapValue(Object value) {
return codec.encodeMapValue((String)value);
}
@Override
public byte[] encodeMapKey(Object key) {
return codec.encodeMapKey((String)key);
}
@Override
public Object decodeMapValue(ByteBuffer bytes) {
return codec.decodeMapValue(bytes);
}
@Override
public Object decodeMapKey(ByteBuffer bytes) {
return codec.decodeMapKey(bytes);
}
}

@ -15,13 +15,18 @@
*/
package org.redisson.connection;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.RedisConnectionException;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
@ -33,14 +38,37 @@ abstract class BaseLoadBalancer implements LoadBalancer {
private String password;
List<ConnectionEntry> clients;
final Queue<ConnectionEntry> clients = new ConcurrentLinkedQueue<ConnectionEntry>();
public void init(List<ConnectionEntry> clients, RedisCodec codec, String password) {
this.clients = clients;
public void init(RedisCodec codec, String password) {
this.codec = codec;
this.password = password;
}
public void add(ConnectionEntry entry) {
clients.add(entry);
}
public void remove(String host, int port) {
InetSocketAddress addr = new InetSocketAddress(host, port);
for (Iterator<ConnectionEntry> iterator = clients.iterator(); iterator.hasNext();) {
ConnectionEntry entry = iterator.next();
if (!entry.getClient().getAddr().equals(addr)) {
continue;
}
log.info("slave {} removed", entry.getClient().getAddr());
iterator.remove();
// TODO re-attach listeners
for (RedisPubSubConnection conn : entry.getSubscribeConnections()) {
conn.getListeners();
}
entry.shutdown();
log.info("slave {} shutdown", entry.getClient().getAddr());
break;
}
}
@SuppressWarnings("unchecked")
public RedisPubSubConnection nextPubSubConnection() {
List<ConnectionEntry> clientsCopy = new ArrayList<ConnectionEntry>(clients);
@ -60,15 +88,21 @@ abstract class BaseLoadBalancer implements LoadBalancer {
if (!entry.getSubscribeConnectionsSemaphore().tryAcquire()) {
clientsCopy.remove(index);
} else {
RedisPubSubConnection conn = entry.getSubscribeConnections().poll();
if (conn != null) {
try {
RedisPubSubConnection conn = entry.getSubscribeConnections().poll();
if (conn != null) {
return conn;
}
conn = entry.getClient().connectPubSub(codec);
if (password != null) {
conn.auth(password);
}
return conn;
} catch (RedisConnectionException e) {
// TODO connection scoring
log.warn("Can't connect to {}, trying next connection!", entry.getClient().getAddr());
clientsCopy.remove(index);
}
conn = entry.getClient().connectPubSub(codec);
if (password != null) {
conn.auth(password);
}
return conn;
}
}
}
@ -95,11 +129,17 @@ abstract class BaseLoadBalancer implements LoadBalancer {
if (conn != null) {
return conn;
}
conn = entry.getClient().connect(codec);
if (password != null) {
conn.auth(password);
try {
conn = entry.getClient().connect(codec);
if (password != null) {
conn.auth(password);
}
return conn;
} catch (RedisConnectionException e) {
// TODO connection scoring
log.warn("Can't connect to {}, trying next connection!", entry.getClient().getAddr());
clientsCopy.remove(index);
}
return conn;
}
}
}

@ -33,9 +33,12 @@ public class ConnectionEntry {
private final Queue<RedisPubSubConnection> subscribeConnections = new ConcurrentLinkedQueue<RedisPubSubConnection>();
private final Queue<RedisConnection> connections = new ConcurrentLinkedQueue<RedisConnection>();
private final int poolSize;
public ConnectionEntry(RedisClient client, int poolSize, int subscribePoolSize) {
super();
this.client = client;
this.poolSize = poolSize;
this.connectionsSemaphore = new Semaphore(poolSize);
this.subscribeConnectionsSemaphore = new Semaphore(subscribePoolSize);
}
@ -44,6 +47,11 @@ public class ConnectionEntry {
return client;
}
public void shutdown() {
connectionsSemaphore.acquireUninterruptibly(poolSize);
client.shutdown();
}
public Semaphore getSubscribeConnectionsSemaphore() {
return subscribeConnectionsSemaphore;
}

@ -47,6 +47,8 @@ import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
//TODO ping support
public interface ConnectionManager {
void changeMaster(String host, int port);
<T> FutureListener<T> createReleaseWriteListener(final RedisConnection conn);
<T> FutureListener<T> createReleaseReadListener(final RedisConnection conn);

@ -23,7 +23,11 @@ import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
public interface LoadBalancer {
void init(List<ConnectionEntry> clients, RedisCodec codec, String password);
void init(RedisCodec codec, String password);
void add(ConnectionEntry entry);
void remove(String host, int port);
RedisConnection nextConnection();

@ -53,7 +53,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
private EventLoopGroup group;
private final List<ConnectionEntry> slaveConnections = new ArrayList<ConnectionEntry>();
private final Queue<RedisConnection> masterConnections = new ConcurrentLinkedQueue<RedisConnection>();
private final Queue<PubSubConnectionEntry> pubSubConnections = new ConcurrentLinkedQueue<PubSubConnectionEntry>();
@ -77,33 +76,28 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
void init(MasterSlaveConnectionConfig config, Config cfg) {
this.group = new NioEventLoopGroup(cfg.getThreads());
this.config = config;
this.codec = new RedisCodecWrapper(cfg.getCodec());
balancer = config.getLoadBalancer();
balancer.init(codec, config.getPassword());
for (URI address : this.config.getSlaveAddresses()) {
RedisClient client = new RedisClient(group, address.getHost(), address.getPort());
slaveClients.add(client);
slaveConnections.add(new ConnectionEntry(client,
balancer.add(new ConnectionEntry(client,
this.config.getSlaveConnectionPoolSize(),
this.config.getSlaveSubscriptionConnectionPoolSize()));
}
masterClient = new RedisClient(group, this.config.getMasterAddress().getHost(), this.config.getMasterAddress().getPort());
codec = new RedisCodecWrapper(cfg.getCodec());
if (!slaveConnections.isEmpty()) {
balancer = config.getLoadBalancer();
balancer.init(slaveConnections, codec, config.getPassword());
}
masterClient = new RedisClient(group, this.config.getMasterAddress().getHost(), this.config.getMasterAddress().getPort());
masterConnectionsSemaphore = new Semaphore(this.config.getMasterConnectionPoolSize());
}
public void changeMaster(String host, int port) {
// TODO async
masterClient.shutdown();
RedisClient oldMaster = masterClient;
masterClient = new RedisClient(group, host, port);
// TODO
// 1. remove slave
// 2. re-attach listeners
// 3. remove dead slave
// TODO async & re-attach listeners
balancer.remove(host, port);
oldMaster.shutdown();
}
@Override
@ -131,11 +125,13 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
acquireMasterConnection();
RedisConnection<K, V> conn = masterConnections.poll();
if (conn == null) {
conn = masterClient.connect(codec);
if (config.getPassword() != null) {
conn.auth(config.getPassword());
}
if (conn != null) {
return conn;
}
conn = masterClient.connect(codec);
if (config.getPassword() != null) {
conn.auth(config.getPassword());
}
return conn;
}
@ -231,10 +227,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
}
void releaseMasterConnection() {
masterConnectionsSemaphore.release();
}
@Override
public void unsubscribe(PubSubConnectionEntry entry, String channelName) {
if (entry.hasListeners(channelName)) {
@ -254,7 +246,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
public void releaseWrite(RedisConnection сonnection) {
masterConnections.add(сonnection);
releaseMasterConnection();
masterConnectionsSemaphore.release();
}
public void releaseRead(RedisConnection сonnection) {

@ -0,0 +1,124 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.connection;
import io.netty.channel.nio.NioEventLoopGroup;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.Config;
import org.redisson.MasterSlaveConnectionConfig;
import org.redisson.Redisson;
import org.redisson.SentinelConnectionConfig;
import org.redisson.codec.StringCodec;
import org.redisson.core.MessageListener;
import org.redisson.core.RTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.lambdaworks.redis.RedisAsyncConnection;
import com.lambdaworks.redis.RedisClient;
public class SentinelConnectionManager extends MasterSlaveConnectionManager {
private final Logger log = LoggerFactory.getLogger(getClass());
private final List<Redisson> sentinels = new ArrayList<Redisson>();
public SentinelConnectionManager(final SentinelConnectionConfig cfg, Config config) {
init(cfg, config);
}
private void init(final SentinelConnectionConfig cfg, final Config config) {
final MasterSlaveConnectionConfig c = new MasterSlaveConnectionConfig();
for (URI addr : cfg.getSentinelAddresses()) {
RedisClient client = new RedisClient(new NioEventLoopGroup(1), addr.getHost(), addr.getPort());
RedisAsyncConnection<String, String> connection = client.connectAsync();
// TODO async
List<String> master = connection.getMasterAddrByKey(cfg.getMasterName()).awaitUninterruptibly().getNow();
String masterHost = master.get(0) + ":" + master.get(1);
c.setMasterAddress(masterHost);
log.info("master: {}", masterHost);
// TODO async
List<Map<String, String>> slaves = connection.slaves(cfg.getMasterName()).awaitUninterruptibly().getNow();
for (Map<String, String> map : slaves) {
String ip = map.get("ip");
String port = map.get("port");
log.info("slave: {}:{}", ip, port);
c.addSlaveAddress(ip + ":" + port);
}
if (slaves.isEmpty()) {
log.info("master added as slave");
c.addSlaveAddress(masterHost);
}
client.shutdown();
break;
}
final AtomicReference<String> master = new AtomicReference<String>();
for (final URI addr : cfg.getSentinelAddresses()) {
Config sc = new Config();
sc.setCodec(new StringCodec());
sc.useSingleConnection().setAddress(addr.getHost() + ":" + addr.getPort());
Redisson r = Redisson.create(sc);
sentinels.add(r);
final RTopic<String> t = r.getTopic("+switch-master");
t.addListener(new MessageListener<String>() {
@Override
public void onMessage(String msg) {
String[] parts = msg.split(" ");
if (parts.length > 3) {
if (cfg.getMasterName().equals(parts[0])) {
String ip = parts[3];
String port = parts[4];
String current = master.get();
String newMaster = ip + ":" + port;
if (!newMaster.equals(current)
&& master.compareAndSet(current, newMaster)) {
log.debug("changing master to {}:{}", ip, port);
changeMaster(ip, Integer.valueOf(port));
}
}
} else {
log.error("Invalid message: {} from Sentinel({}:{}) on channel {}", msg, addr.getHost(), addr.getPort(), t.getName());
}
}
});
}
init(c, config);
}
@Override
public void shutdown() {
for (Redisson sentinel : sentinels) {
sentinel.shutdown();
}
super.shutdown();
}
}

@ -0,0 +1,40 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2012 Nikita Koksharov
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy.MM.dd HH:mm:ss.SSS} %-5level %c{0} : %msg%n</pattern>
</encoder>
</appender>
<logger name="org.redisson" additivity="true">
<level value="trace"/>
</logger>
<logger name="org.jboss.netty" additivity="true">
<level value="debug"/>
</logger>
<root>
<level value="debug"/>
<appender-ref ref="CONSOLE"/>
</root>
</configuration>
Loading…
Cancel
Save