From 56254719ff615662961b40bc6f0526ea96dbdb25 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 11 Jun 2014 21:41:01 +0400 Subject: [PATCH] Sentinel support. #30 --- .../redis/RedisAsyncConnection.java | 11 ++ .../com/lambdaworks/redis/RedisClient.java | 8 +- .../redis/RedisConnectionException.java | 11 ++ .../redis/output/ListMapOutput.java | 54 ++++++++ .../redis/protocol/CommandType.java | 4 +- src/main/java/org/redisson/Config.java | 21 +++ src/main/java/org/redisson/Redisson.java | 6 +- .../redisson/SentinelConnectionConfig.java | 62 +++++++++ .../java/org/redisson/codec/StringCodec.java | 66 ++++++++++ .../redisson/connection/BaseLoadBalancer.java | 68 ++++++++-- .../redisson/connection/ConnectionEntry.java | 8 ++ .../connection/ConnectionManager.java | 2 + .../org/redisson/connection/LoadBalancer.java | 6 +- .../MasterSlaveConnectionManager.java | 44 +++---- .../connection/SentinelConnectionManager.java | 124 ++++++++++++++++++ src/test/resources/logback.xml | 40 ++++++ 16 files changed, 490 insertions(+), 45 deletions(-) create mode 100644 src/main/java/com/lambdaworks/redis/RedisConnectionException.java create mode 100644 src/main/java/com/lambdaworks/redis/output/ListMapOutput.java create mode 100644 src/main/java/org/redisson/SentinelConnectionConfig.java create mode 100644 src/main/java/org/redisson/codec/StringCodec.java create mode 100644 src/main/java/org/redisson/connection/SentinelConnectionManager.java create mode 100644 src/test/resources/logback.xml diff --git a/src/main/java/com/lambdaworks/redis/RedisAsyncConnection.java b/src/main/java/com/lambdaworks/redis/RedisAsyncConnection.java index e78d2e944..44ab6b4bd 100644 --- a/src/main/java/com/lambdaworks/redis/RedisAsyncConnection.java +++ b/src/main/java/com/lambdaworks/redis/RedisAsyncConnection.java @@ -56,6 +56,7 @@ import com.lambdaworks.redis.output.IntegerOutput; import com.lambdaworks.redis.output.KeyListOutput; import com.lambdaworks.redis.output.KeyOutput; import com.lambdaworks.redis.output.KeyValueOutput; +import com.lambdaworks.redis.output.ListMapOutput; import com.lambdaworks.redis.output.MapKeyListOutput; import com.lambdaworks.redis.output.MapOutput; import com.lambdaworks.redis.output.MapValueListOutput; @@ -1006,6 +1007,16 @@ public class RedisAsyncConnection extends ChannelInboundHandlerAdapter { return dispatch(PFADD, new IntegerOutput(codec), args); } + public Future> getMasterAddrByKey(K name) { + CommandArgs args = new CommandArgs(codec).add("get-master-addr-by-name").addKey(name); + return dispatch(SENTINEL, new ValueListOutput(codec), args); + } + + public Future>> slaves(K key) { + CommandArgs args = new CommandArgs(codec).add("slaves").addKey(key); + return dispatch(SENTINEL, new ListMapOutput(codec), args); + } + /** * Wait until commands are complete or the connection timeout is reached. * diff --git a/src/main/java/com/lambdaworks/redis/RedisClient.java b/src/main/java/com/lambdaworks/redis/RedisClient.java index c514fb1c4..47947924b 100644 --- a/src/main/java/com/lambdaworks/redis/RedisClient.java +++ b/src/main/java/com/lambdaworks/redis/RedisClient.java @@ -41,6 +41,7 @@ public class RedisClient { private ChannelGroup channels; private long timeout; private TimeUnit unit; + private InetSocketAddress addr; /** * Create a new client that connects to the supplied host on the default port. @@ -60,7 +61,7 @@ public class RedisClient { * @param port Server port. */ public RedisClient(EventLoopGroup group, String host, int port) { - InetSocketAddress addr = new InetSocketAddress(host, port); + addr = new InetSocketAddress(host, port); bootstrap = new Bootstrap().channel(NioSocketChannel.class).group(group).remoteAddress(addr); @@ -179,7 +180,7 @@ public class RedisClient { return connection; } catch (Throwable e) { - throw new RedisException("Unable to connect", e); + throw new RedisConnectionException("Unable to connect", e); } } @@ -198,6 +199,9 @@ public class RedisClient { bootstrap.group().shutdownGracefully().syncUninterruptibly(); } + public InetSocketAddress getAddr() { + return addr; + } } diff --git a/src/main/java/com/lambdaworks/redis/RedisConnectionException.java b/src/main/java/com/lambdaworks/redis/RedisConnectionException.java new file mode 100644 index 000000000..a50379fbc --- /dev/null +++ b/src/main/java/com/lambdaworks/redis/RedisConnectionException.java @@ -0,0 +1,11 @@ +package com.lambdaworks.redis; + +public class RedisConnectionException extends RedisException { + + private static final long serialVersionUID = 4007817232147176510L; + + public RedisConnectionException(String msg, Throwable e) { + super(msg, e); + } + +} diff --git a/src/main/java/com/lambdaworks/redis/output/ListMapOutput.java b/src/main/java/com/lambdaworks/redis/output/ListMapOutput.java new file mode 100644 index 000000000..60706cc55 --- /dev/null +++ b/src/main/java/com/lambdaworks/redis/output/ListMapOutput.java @@ -0,0 +1,54 @@ +// Copyright (C) 2011 - Will Glozer. All rights reserved. + +package com.lambdaworks.redis.output; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.lambdaworks.redis.codec.RedisCodec; +import com.lambdaworks.redis.protocol.CommandOutput; + +/** + * {@link Map} of keys and values output. + * + * @param Key type. + * @param Value type. + * + * @author Will Glozer + */ +public class ListMapOutput extends CommandOutput>> { + private K key; + private int index = 0; + + public ListMapOutput(RedisCodec codec) { + super(codec, new ArrayList>()); + } + + @Override + public void set(ByteBuffer bytes) { + if (key == null) { + key = codec.decodeMapKey(bytes); + return; + } + + V value = (bytes == null) ? null : codec.decodeMapValue(bytes); + if (output.isEmpty()) { + output.add(new HashMap()); + } + Map map = output.get(index); + if (map == null) { + map = new HashMap(); + output.add(map); + } + if (map.get(key) != null) { + index++; + map = new HashMap(); + output.add(map); + } + map.put(key, value); + key = null; + } +} diff --git a/src/main/java/com/lambdaworks/redis/protocol/CommandType.java b/src/main/java/com/lambdaworks/redis/protocol/CommandType.java index f6572803d..d0bb5c32d 100644 --- a/src/main/java/com/lambdaworks/redis/protocol/CommandType.java +++ b/src/main/java/com/lambdaworks/redis/protocol/CommandType.java @@ -71,7 +71,9 @@ public enum CommandType { BITCOUNT, BITOP, GETBIT, SETBIT, // HyperLogLog - PFADD, PFCOUNT, PFMERGE; + PFADD, PFCOUNT, PFMERGE, + + SENTINEL; public byte[] bytes; diff --git a/src/main/java/org/redisson/Config.java b/src/main/java/org/redisson/Config.java index 3e941a4ba..7f3172381 100644 --- a/src/main/java/org/redisson/Config.java +++ b/src/main/java/org/redisson/Config.java @@ -26,6 +26,8 @@ import org.redisson.codec.RedissonCodec; */ public class Config { + private SentinelConnectionConfig sentinelConnectionConfig; + private MasterSlaveConnectionConfig masterSlaveConnectionConfig; private SingleConnectionConfig singleConnectionConfig; @@ -57,6 +59,9 @@ public class Config { if (oldConf.getMasterSlaveConnectionConfig() != null) { setMasterSlaveConnectionConfig(new MasterSlaveConnectionConfig(oldConf.getMasterSlaveConnectionConfig())); } + if (oldConf.getSentinelConnectionConfig() != null ) { + setSentinelConnectionConfig(new SentinelConnectionConfig(oldConf.getSentinelConnectionConfig())); + } } /** @@ -89,6 +94,22 @@ public class Config { this.singleConnectionConfig = singleConnectionConfig; } + public SentinelConnectionConfig useSentinelConnection() { + if (singleConnectionConfig != null) { + throw new IllegalStateException("single connection already used!"); + } + if (sentinelConnectionConfig == null) { + sentinelConnectionConfig = new SentinelConnectionConfig(); + } + return sentinelConnectionConfig; + } + SentinelConnectionConfig getSentinelConnectionConfig() { + return sentinelConnectionConfig; + } + void setSentinelConnectionConfig(SentinelConnectionConfig sentinelConnectionConfig) { + this.sentinelConnectionConfig = sentinelConnectionConfig; + } + public MasterSlaveConnectionConfig useMasterSlaveConnection() { if (singleConnectionConfig != null) { throw new IllegalStateException("single connection already used!"); diff --git a/src/main/java/org/redisson/Redisson.java b/src/main/java/org/redisson/Redisson.java index 8ccd9ff66..565593a00 100644 --- a/src/main/java/org/redisson/Redisson.java +++ b/src/main/java/org/redisson/Redisson.java @@ -20,6 +20,7 @@ import java.util.concurrent.ConcurrentMap; import org.redisson.connection.ConnectionManager; import org.redisson.connection.MasterSlaveConnectionManager; +import org.redisson.connection.SentinelConnectionManager; import org.redisson.connection.SingleConnectionManager; import org.redisson.core.RAtomicLong; import org.redisson.core.RBucket; @@ -72,8 +73,10 @@ public class Redisson { Config configCopy = new Config(config); if (configCopy.getMasterSlaveConnectionConfig() != null) { connectionManager = new MasterSlaveConnectionManager(configCopy.getMasterSlaveConnectionConfig(), configCopy); - } else { + } else if (configCopy.getSingleConnectionConfig() != null) { connectionManager = new SingleConnectionManager(configCopy.getSingleConnectionConfig(), configCopy); + } else { + connectionManager = new SentinelConnectionManager(configCopy.getSentinelConnectionConfig(), configCopy); } } @@ -86,6 +89,7 @@ public class Redisson { Config config = new Config(); config.useSingleConnection().setAddress("127.0.0.1:6379"); // config.useMasterSlaveConnection().setMasterAddress("127.0.0.1:6379").addSlaveAddress("127.0.0.1:6389").addSlaveAddress("127.0.0.1:6399"); +// config.useSentinelConnection().setMasterName("mymaster").addSentinelAddress("127.0.0.1:26389", "127.0.0.1:26379"); return create(config); } diff --git a/src/main/java/org/redisson/SentinelConnectionConfig.java b/src/main/java/org/redisson/SentinelConnectionConfig.java new file mode 100644 index 000000000..533769548 --- /dev/null +++ b/src/main/java/org/redisson/SentinelConnectionConfig.java @@ -0,0 +1,62 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; + +public class SentinelConnectionConfig { + + private List sentinelAddresses = new ArrayList(); + + private String masterName; + + public SentinelConnectionConfig() { + } + + SentinelConnectionConfig(SentinelConnectionConfig config) { + setSentinelAddresses(config.getSentinelAddresses()); + setMasterName(config.getMasterName()); + } + + public SentinelConnectionConfig setMasterName(String masterName) { + this.masterName = masterName; + return this; + } + public String getMasterName() { + return masterName; + } + + public SentinelConnectionConfig addSentinelAddress(String ... addresses) { + for (String address : addresses) { + try { + sentinelAddresses.add(new URI("//" + address)); + } catch (URISyntaxException e) { + throw new IllegalArgumentException("Can't parse " + address); + } + } + return this; + } + public List getSentinelAddresses() { + return sentinelAddresses; + } + void setSentinelAddresses(List sentinelAddresses) { + this.sentinelAddresses = sentinelAddresses; + } + +} diff --git a/src/main/java/org/redisson/codec/StringCodec.java b/src/main/java/org/redisson/codec/StringCodec.java new file mode 100644 index 000000000..09c54f22e --- /dev/null +++ b/src/main/java/org/redisson/codec/StringCodec.java @@ -0,0 +1,66 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.codec; + +import java.nio.ByteBuffer; + +import com.lambdaworks.redis.codec.Utf8StringCodec; + +public class StringCodec implements RedissonCodec { + + private final Utf8StringCodec codec = new Utf8StringCodec(); + + @Override + public Object decodeKey(ByteBuffer bytes) { + return codec.decodeKey(bytes); + } + + @Override + public Object decodeValue(ByteBuffer bytes) { + return codec.decodeValue(bytes); + } + + @Override + public byte[] encodeKey(Object key) { + return codec.encodeKey((String)key); + } + + @Override + public byte[] encodeValue(Object value) { + return codec.encodeValue((String)value); + } + + @Override + public byte[] encodeMapValue(Object value) { + return codec.encodeMapValue((String)value); + } + + @Override + public byte[] encodeMapKey(Object key) { + return codec.encodeMapKey((String)key); + } + + @Override + public Object decodeMapValue(ByteBuffer bytes) { + return codec.decodeMapValue(bytes); + } + + @Override + public Object decodeMapKey(ByteBuffer bytes) { + return codec.decodeMapKey(bytes); + } + +} diff --git a/src/main/java/org/redisson/connection/BaseLoadBalancer.java b/src/main/java/org/redisson/connection/BaseLoadBalancer.java index 16bf6da1e..4d3b97597 100644 --- a/src/main/java/org/redisson/connection/BaseLoadBalancer.java +++ b/src/main/java/org/redisson/connection/BaseLoadBalancer.java @@ -15,13 +15,18 @@ */ package org.redisson.connection; +import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.lambdaworks.redis.RedisConnection; +import com.lambdaworks.redis.RedisConnectionException; import com.lambdaworks.redis.codec.RedisCodec; import com.lambdaworks.redis.pubsub.RedisPubSubConnection; @@ -33,14 +38,37 @@ abstract class BaseLoadBalancer implements LoadBalancer { private String password; - List clients; + final Queue clients = new ConcurrentLinkedQueue(); - public void init(List clients, RedisCodec codec, String password) { - this.clients = clients; + public void init(RedisCodec codec, String password) { this.codec = codec; this.password = password; } + public void add(ConnectionEntry entry) { + clients.add(entry); + } + + public void remove(String host, int port) { + InetSocketAddress addr = new InetSocketAddress(host, port); + for (Iterator iterator = clients.iterator(); iterator.hasNext();) { + ConnectionEntry entry = iterator.next(); + if (!entry.getClient().getAddr().equals(addr)) { + continue; + } + + log.info("slave {} removed", entry.getClient().getAddr()); + iterator.remove(); + // TODO re-attach listeners + for (RedisPubSubConnection conn : entry.getSubscribeConnections()) { + conn.getListeners(); + } + entry.shutdown(); + log.info("slave {} shutdown", entry.getClient().getAddr()); + break; + } + } + @SuppressWarnings("unchecked") public RedisPubSubConnection nextPubSubConnection() { List clientsCopy = new ArrayList(clients); @@ -60,15 +88,21 @@ abstract class BaseLoadBalancer implements LoadBalancer { if (!entry.getSubscribeConnectionsSemaphore().tryAcquire()) { clientsCopy.remove(index); } else { - RedisPubSubConnection conn = entry.getSubscribeConnections().poll(); - if (conn != null) { + try { + RedisPubSubConnection conn = entry.getSubscribeConnections().poll(); + if (conn != null) { + return conn; + } + conn = entry.getClient().connectPubSub(codec); + if (password != null) { + conn.auth(password); + } return conn; + } catch (RedisConnectionException e) { + // TODO connection scoring + log.warn("Can't connect to {}, trying next connection!", entry.getClient().getAddr()); + clientsCopy.remove(index); } - conn = entry.getClient().connectPubSub(codec); - if (password != null) { - conn.auth(password); - } - return conn; } } } @@ -95,11 +129,17 @@ abstract class BaseLoadBalancer implements LoadBalancer { if (conn != null) { return conn; } - conn = entry.getClient().connect(codec); - if (password != null) { - conn.auth(password); + try { + conn = entry.getClient().connect(codec); + if (password != null) { + conn.auth(password); + } + return conn; + } catch (RedisConnectionException e) { + // TODO connection scoring + log.warn("Can't connect to {}, trying next connection!", entry.getClient().getAddr()); + clientsCopy.remove(index); } - return conn; } } } diff --git a/src/main/java/org/redisson/connection/ConnectionEntry.java b/src/main/java/org/redisson/connection/ConnectionEntry.java index 2efeb5ef9..901a9231e 100644 --- a/src/main/java/org/redisson/connection/ConnectionEntry.java +++ b/src/main/java/org/redisson/connection/ConnectionEntry.java @@ -33,9 +33,12 @@ public class ConnectionEntry { private final Queue subscribeConnections = new ConcurrentLinkedQueue(); private final Queue connections = new ConcurrentLinkedQueue(); + private final int poolSize; + public ConnectionEntry(RedisClient client, int poolSize, int subscribePoolSize) { super(); this.client = client; + this.poolSize = poolSize; this.connectionsSemaphore = new Semaphore(poolSize); this.subscribeConnectionsSemaphore = new Semaphore(subscribePoolSize); } @@ -44,6 +47,11 @@ public class ConnectionEntry { return client; } + public void shutdown() { + connectionsSemaphore.acquireUninterruptibly(poolSize); + client.shutdown(); + } + public Semaphore getSubscribeConnectionsSemaphore() { return subscribeConnectionsSemaphore; } diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index e22673676..2ccda918b 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -47,6 +47,8 @@ import com.lambdaworks.redis.pubsub.RedisPubSubConnection; //TODO ping support public interface ConnectionManager { + void changeMaster(String host, int port); + FutureListener createReleaseWriteListener(final RedisConnection conn); FutureListener createReleaseReadListener(final RedisConnection conn); diff --git a/src/main/java/org/redisson/connection/LoadBalancer.java b/src/main/java/org/redisson/connection/LoadBalancer.java index eaeeb4bba..2419a37bd 100644 --- a/src/main/java/org/redisson/connection/LoadBalancer.java +++ b/src/main/java/org/redisson/connection/LoadBalancer.java @@ -23,7 +23,11 @@ import com.lambdaworks.redis.pubsub.RedisPubSubConnection; public interface LoadBalancer { - void init(List clients, RedisCodec codec, String password); + void init(RedisCodec codec, String password); + + void add(ConnectionEntry entry); + + void remove(String host, int port); RedisConnection nextConnection(); diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 7db7eb376..764d4b10d 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -53,7 +53,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager { private EventLoopGroup group; - private final List slaveConnections = new ArrayList(); private final Queue masterConnections = new ConcurrentLinkedQueue(); private final Queue pubSubConnections = new ConcurrentLinkedQueue(); @@ -77,33 +76,28 @@ public class MasterSlaveConnectionManager implements ConnectionManager { void init(MasterSlaveConnectionConfig config, Config cfg) { this.group = new NioEventLoopGroup(cfg.getThreads()); this.config = config; + this.codec = new RedisCodecWrapper(cfg.getCodec()); + + balancer = config.getLoadBalancer(); + balancer.init(codec, config.getPassword()); for (URI address : this.config.getSlaveAddresses()) { RedisClient client = new RedisClient(group, address.getHost(), address.getPort()); slaveClients.add(client); - slaveConnections.add(new ConnectionEntry(client, + balancer.add(new ConnectionEntry(client, this.config.getSlaveConnectionPoolSize(), this.config.getSlaveSubscriptionConnectionPoolSize())); } - masterClient = new RedisClient(group, this.config.getMasterAddress().getHost(), this.config.getMasterAddress().getPort()); - - codec = new RedisCodecWrapper(cfg.getCodec()); - if (!slaveConnections.isEmpty()) { - balancer = config.getLoadBalancer(); - balancer.init(slaveConnections, codec, config.getPassword()); - } + masterClient = new RedisClient(group, this.config.getMasterAddress().getHost(), this.config.getMasterAddress().getPort()); masterConnectionsSemaphore = new Semaphore(this.config.getMasterConnectionPoolSize()); } public void changeMaster(String host, int port) { - // TODO async - masterClient.shutdown(); - + RedisClient oldMaster = masterClient; masterClient = new RedisClient(group, host, port); - // TODO - // 1. remove slave - // 2. re-attach listeners - // 3. remove dead slave + // TODO async & re-attach listeners + balancer.remove(host, port); + oldMaster.shutdown(); } @Override @@ -131,11 +125,13 @@ public class MasterSlaveConnectionManager implements ConnectionManager { acquireMasterConnection(); RedisConnection conn = masterConnections.poll(); - if (conn == null) { - conn = masterClient.connect(codec); - if (config.getPassword() != null) { - conn.auth(config.getPassword()); - } + if (conn != null) { + return conn; + } + + conn = masterClient.connect(codec); + if (config.getPassword() != null) { + conn.auth(config.getPassword()); } return conn; } @@ -231,10 +227,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } } - void releaseMasterConnection() { - masterConnectionsSemaphore.release(); - } - @Override public void unsubscribe(PubSubConnectionEntry entry, String channelName) { if (entry.hasListeners(channelName)) { @@ -254,7 +246,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { public void releaseWrite(RedisConnection сonnection) { masterConnections.add(сonnection); - releaseMasterConnection(); + masterConnectionsSemaphore.release(); } public void releaseRead(RedisConnection сonnection) { diff --git a/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/src/main/java/org/redisson/connection/SentinelConnectionManager.java new file mode 100644 index 000000000..709c1fd39 --- /dev/null +++ b/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -0,0 +1,124 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.connection; + +import io.netty.channel.nio.NioEventLoopGroup; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import org.redisson.Config; +import org.redisson.MasterSlaveConnectionConfig; +import org.redisson.Redisson; +import org.redisson.SentinelConnectionConfig; +import org.redisson.codec.StringCodec; +import org.redisson.core.MessageListener; +import org.redisson.core.RTopic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.lambdaworks.redis.RedisAsyncConnection; +import com.lambdaworks.redis.RedisClient; + +public class SentinelConnectionManager extends MasterSlaveConnectionManager { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final List sentinels = new ArrayList(); + + public SentinelConnectionManager(final SentinelConnectionConfig cfg, Config config) { + init(cfg, config); + } + + private void init(final SentinelConnectionConfig cfg, final Config config) { + final MasterSlaveConnectionConfig c = new MasterSlaveConnectionConfig(); + for (URI addr : cfg.getSentinelAddresses()) { + RedisClient client = new RedisClient(new NioEventLoopGroup(1), addr.getHost(), addr.getPort()); + RedisAsyncConnection connection = client.connectAsync(); + + // TODO async + List master = connection.getMasterAddrByKey(cfg.getMasterName()).awaitUninterruptibly().getNow(); + String masterHost = master.get(0) + ":" + master.get(1); + c.setMasterAddress(masterHost); + log.info("master: {}", masterHost); + + // TODO async + List> slaves = connection.slaves(cfg.getMasterName()).awaitUninterruptibly().getNow(); + for (Map map : slaves) { + String ip = map.get("ip"); + String port = map.get("port"); + log.info("slave: {}:{}", ip, port); + c.addSlaveAddress(ip + ":" + port); + } + if (slaves.isEmpty()) { + log.info("master added as slave"); + c.addSlaveAddress(masterHost); + } + + client.shutdown(); + break; + } + + final AtomicReference master = new AtomicReference(); + for (final URI addr : cfg.getSentinelAddresses()) { + Config sc = new Config(); + sc.setCodec(new StringCodec()); + sc.useSingleConnection().setAddress(addr.getHost() + ":" + addr.getPort()); + Redisson r = Redisson.create(sc); + sentinels.add(r); + + final RTopic t = r.getTopic("+switch-master"); + t.addListener(new MessageListener() { + @Override + public void onMessage(String msg) { + String[] parts = msg.split(" "); + + if (parts.length > 3) { + if (cfg.getMasterName().equals(parts[0])) { + String ip = parts[3]; + String port = parts[4]; + + String current = master.get(); + String newMaster = ip + ":" + port; + if (!newMaster.equals(current) + && master.compareAndSet(current, newMaster)) { + log.debug("changing master to {}:{}", ip, port); + changeMaster(ip, Integer.valueOf(port)); + } + } + } else { + log.error("Invalid message: {} from Sentinel({}:{}) on channel {}", msg, addr.getHost(), addr.getPort(), t.getName()); + } + } + }); + } + + init(c, config); + } + + @Override + public void shutdown() { + for (Redisson sentinel : sentinels) { + sentinel.shutdown(); + } + + super.shutdown(); + } +} + diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml new file mode 100644 index 000000000..c9671a0e0 --- /dev/null +++ b/src/test/resources/logback.xml @@ -0,0 +1,40 @@ + + + + + + + %d{yyyy.MM.dd HH:mm:ss.SSS} %-5level %c{0} : %msg%n + + + + + + + + + + + + + + + + +