Sentinel and Cluster support. Spring Data Redis integration. #1373

pull/1547/head
Nikita 7 years ago
parent a489a82f5d
commit 1ca20f25bb

@ -267,6 +267,7 @@
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
@ -274,6 +275,18 @@
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>[3.1,5.0)</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>1.10.19</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.session</groupId>
<artifactId>spring-session</artifactId>

@ -70,6 +70,8 @@ import org.redisson.client.protocol.decoder.StringMapDataDecoder;
import org.redisson.client.protocol.decoder.StringReplayDecoder;
import org.redisson.client.protocol.pubsub.PubSubStatusDecoder;
import org.redisson.cluster.ClusterNodeInfo;
import org.redisson.spring.data.connection.StringToListConvertor;
import org.redisson.spring.data.connection.TimeLongObjectDecoder;
/**
*
@ -148,6 +150,7 @@ public interface RedisCommands {
RedisCommand<String> PING = new RedisCommand<String>("PING", new ListObjectDecoder<String>(0));
RedisStrictCommand<Boolean> PING_BOOL = new RedisStrictCommand<Boolean>("PING", new BooleanNotNullReplayConvertor());
RedisStrictCommand<Void> SHUTDOWN = new RedisStrictCommand<Void>("SHUTDOWN", new VoidReplayConvertor());
RedisStrictCommand<Void> UNWATCH = new RedisStrictCommand<Void>("UNWATCH", new VoidReplayConvertor());
RedisStrictCommand<Void> WATCH = new RedisStrictCommand<Void>("WATCH", new VoidReplayConvertor());
RedisStrictCommand<Void> MULTI = new RedisStrictCommand<Void>("MULTI", new VoidReplayConvertor());
@ -262,6 +265,11 @@ public interface RedisCommands {
RedisStrictCommand<String> CLIENT_GETNAME = new RedisStrictCommand<String>("CLIENT", "GETNAME", new StringDataDecoder());
RedisStrictCommand<Void> FLUSHDB = new RedisStrictCommand<Void>("FLUSHDB", new VoidReplayConvertor());
RedisStrictCommand<Void> FLUSHALL = new RedisStrictCommand<Void>("FLUSHALL", new VoidReplayConvertor());
RedisStrictCommand<Void> SAVE = new RedisStrictCommand<Void>("SAVE", new VoidReplayConvertor());
RedisStrictCommand<Long> LASTSAVE = new RedisStrictCommand<Long>("LASTSAVE");
RedisStrictCommand<Void> BGSAVE = new RedisStrictCommand<Void>("BGSAVE", new VoidReplayConvertor());
RedisStrictCommand<Void> BGREWRITEAOF = new RedisStrictCommand<Void>("BGREWRITEAOF", new VoidReplayConvertor());
RedisStrictCommand<Void> FLUSHDB_ASYNC = new RedisStrictCommand<Void>("FLUSHDB", "ASYNC", new VoidReplayConvertor());
RedisStrictCommand<Void> FLUSHALL_ASYNC = new RedisStrictCommand<Void>("FLUSHALL", "ASYNC", new VoidReplayConvertor());
@ -361,10 +369,17 @@ public interface RedisCommands {
RedisStrictCommand<List<ClusterNodeInfo>> CLUSTER_NODES = new RedisStrictCommand<List<ClusterNodeInfo>>("CLUSTER", "NODES", new ClusterNodesDecoder(false));
RedisStrictCommand<List<ClusterNodeInfo>> CLUSTER_NODES_SSL = new RedisStrictCommand<List<ClusterNodeInfo>>("CLUSTER", "NODES", new ClusterNodesDecoder(true));
RedisStrictCommand<Long> TIME_LONG = new RedisStrictCommand<Long>("TIME", new TimeLongObjectDecoder());
RedisStrictCommand<Time> TIME = new RedisStrictCommand<Time>("TIME", new TimeObjectDecoder());
RedisStrictCommand<Map<String, String>> CLUSTER_INFO = new RedisStrictCommand<Map<String, String>>("CLUSTER", "INFO", new StringMapDataDecoder());
RedisStrictCommand<Void> SENTINEL_FAILOVER = new RedisStrictCommand<Void>("SENTINEL", "FAILOVER", new VoidReplayConvertor());
RedisStrictCommand<Void> SENTINEL_REMOVE = new RedisStrictCommand<Void>("SENTINEL", "REMOVE", new VoidReplayConvertor());
RedisStrictCommand<Void> SENTINEL_MONITOR = new RedisStrictCommand<Void>("SENTINEL", "MONITOR", new VoidReplayConvertor());
RedisStrictCommand<List<String>> SENTINEL_GET_MASTER_ADDR_BY_NAME = new RedisStrictCommand<List<String>>("SENTINEL", "GET-MASTER-ADDR-BY-NAME", new StringListReplayDecoder());
RedisCommand<List<Map<String, String>>> SENTINEL_MASTERS = new RedisCommand<List<Map<String, String>>>("SENTINEL", "MASTERS",
new ListMultiDecoder(new ObjectMapReplayDecoder(), new ObjectListReplayDecoder<String>(ListMultiDecoder.RESET), new ListResultReplayDecoder()));
RedisCommand<List<Map<String, String>>> SENTINEL_SLAVES = new RedisCommand<List<Map<String, String>>>("SENTINEL", "SLAVES",
new ListMultiDecoder(new ObjectMapReplayDecoder(), new ObjectListReplayDecoder<String>(ListMultiDecoder.RESET), new ListResultReplayDecoder()));
RedisCommand<List<Map<String, String>>> SENTINEL_SENTINELS = new RedisCommand<List<Map<String, String>>>("SENTINEL", "SENTINELS",
@ -375,10 +390,17 @@ public interface RedisCommands {
RedisStrictCommand<Void> CLUSTER_FORGET = new RedisStrictCommand<Void>("CLUSTER", "FORGET");
RedisCommand<Object> CLUSTER_SLOTS = new RedisCommand<Object>("CLUSTER", "SLOTS", new SlotsDecoder());
RedisStrictCommand<Void> CLUSTER_RESET = new RedisStrictCommand<Void>("CLUSTER", "RESET");
RedisStrictCommand<Void> CLUSTER_DELSLOTS = new RedisStrictCommand<Void>("CLUSTER", "DELSLOTS");
RedisStrictCommand<Long> CLUSTER_COUNTKEYSINSLOT = new RedisStrictCommand<Long>("CLUSTER", "COUNTKEYSINSLOT");
RedisStrictCommand<List<String>> CLUSTER_GETKEYSINSLOT = new RedisStrictCommand<List<String>>("CLUSTER", "GETKEYSINSLOT", new StringListReplayDecoder());
RedisStrictCommand<Void> CLUSTER_SETSLOT = new RedisStrictCommand<Void>("CLUSTER", "SETSLOT");
RedisStrictCommand<Void> CLUSTER_MEET = new RedisStrictCommand<Void>("CLUSTER", "MEET");
RedisStrictCommand<List<String>> CONFIG_GET = new RedisStrictCommand<List<String>>("CONFIG", "GET", new StringListReplayDecoder());
RedisStrictCommand<Void> CONFIG_SET = new RedisStrictCommand<Void>("CONFIG", "SET", new VoidReplayConvertor());
RedisStrictCommand<Void> CONFIG_RESETSTAT = new RedisStrictCommand<Void>("CONFIG", "RESETSTAT", new VoidReplayConvertor());
RedisStrictCommand<List<String>> CLIENT_LIST = new RedisStrictCommand<List<String>>("CLIENT", "LIST", new StringToListConvertor());
RedisStrictCommand<Map<String, String>> INFO_ALL = new RedisStrictCommand<Map<String, String>>("INFO", "ALL", new StringMapDataDecoder());
RedisStrictCommand<Map<String, String>> INFO_DEFAULT = new RedisStrictCommand<Map<String, String>>("INFO", "DEFAULT", new StringMapDataDecoder());
RedisStrictCommand<Map<String, String>> INFO_SERVER = new RedisStrictCommand<Map<String, String>>("INFO", "SERVER", new StringMapDataDecoder());

@ -32,7 +32,6 @@ public class TimeObjectDecoder implements MultiDecoder<Time> {
@Override
public Decoder<Object> getDecoder(int paramNum, State state) {
// TODO Auto-generated method stub
return null;
}

@ -0,0 +1,42 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import java.util.List;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
/**
*
* @author Nikita Koksharov
*
*/
public class TimeLongObjectDecoder implements MultiDecoder<Long> {
@Override
public Decoder<Object> getDecoder(int paramNum, State state) {
return null;
}
@Override
public Long decode(List<Object> parts, State state) {
return ((Long)parts.get(0)) * 1000L + ((Long)parts.get(1)) / 1000L;
}
}

@ -102,5 +102,7 @@ public interface CommandAsyncExecutor {
<T, R> RFuture<R> readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> RFuture<R> readRandomAsync(Codec codec, RedisCommand<T> command, Object ... params);
<T, R> RFuture<R> readRandomAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params);
}

@ -271,14 +271,21 @@ public class CommandAsyncService implements CommandAsyncExecutor {
@Override
public <T, R> RFuture<R> readRandomAsync(Codec codec, RedisCommand<T> command, Object... params) {
final RPromise<R> mainPromise = createPromise();
final List<MasterSlaveEntry> nodes = new ArrayList<MasterSlaveEntry>(connectionManager.getEntrySet());
RPromise<R> mainPromise = createPromise();
List<MasterSlaveEntry> nodes = new ArrayList<MasterSlaveEntry>(connectionManager.getEntrySet());
Collections.shuffle(nodes);
retryReadRandomAsync(codec, command, mainPromise, nodes, params);
return mainPromise;
}
@Override
public <T, R> RFuture<R> readRandomAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = createPromise();
retryReadRandomAsync(codec, command, mainPromise, Collections.singletonList(entry), params);
return mainPromise;
}
private <R, T> void retryReadRandomAsync(final Codec codec, final RedisCommand<T> command, final RPromise<R> mainPromise,
final List<MasterSlaveEntry> nodes, final Object... params) {
final RPromise<R> attemptPromise = new RedissonPromise<R>();

@ -413,6 +413,10 @@ public class Config {
public boolean isClusterConfig() {
return clusterServersConfig != null;
}
public boolean isSentinelConfig() {
return sentinelServersConfig != null;
}
public int getThreads() {
return threads;

@ -18,6 +18,7 @@ package org.redisson.connection;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
@ -536,6 +537,10 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
res.setDatabase(((SentinelServersConfig)cfg).getDatabase());
return res;
}
public Collection<RedisClient> getSentinels() {
return sentinels.values();
}
@Override
public void shutdown() {

@ -0,0 +1,120 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.misc.URIBuilder;
import org.springframework.data.redis.connection.RedisClusterNode;
import org.springframework.data.redis.connection.RedisClusterNode.Flag;
import org.springframework.data.redis.connection.RedisClusterNode.LinkState;
import org.springframework.data.redis.connection.RedisClusterNode.RedisClusterNodeBuilder;
import org.springframework.data.redis.connection.RedisClusterNode.SlotRange;
import org.springframework.data.redis.connection.RedisNode.NodeType;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
/**
*
* @author Nikita Koksharov
*
*/
public class RedisClusterNodeDecoder implements Decoder<List<RedisClusterNode>> {
@Override
public List<RedisClusterNode> decode(ByteBuf buf, State state) throws IOException {
String response = buf.toString(CharsetUtil.UTF_8);
List<RedisClusterNode> nodes = new ArrayList<RedisClusterNode>();
for (String nodeInfo : response.split("\n")) {
String[] params = nodeInfo.split(" ");
String nodeId = params[0];
String flagsStr = params[2];
Set<Flag> flags = EnumSet.noneOf(Flag.class);
for (String flag : flagsStr.split(",")) {
String flagValue = flag.toUpperCase().replaceAll("\\?", "");
flags.add(Flag.valueOf(flagValue));
}
URI address = null;
if (!flags.contains(Flag.NOADDR)) {
String addr = params[1].split("@")[0];
address = URIBuilder.create("redis://" + addr);
}
String masterId = params[3];
if ("-".equals(masterId)) {
masterId = null;
}
Set<Integer> slotsCollection = new HashSet<Integer>();
LinkState linkState = null;
if (params.length >= 8 && params[7] != null) {
linkState = LinkState.valueOf(params[7].toUpperCase());
}
if (params.length > 8) {
for (int i = 0; i < params.length - 8; i++) {
String slots = params[i + 8];
if (slots.indexOf("-<-") != -1 || slots.indexOf("->-") != -1) {
continue;
}
String[] parts = slots.split("-");
if(parts.length == 1) {
slotsCollection.add(Integer.valueOf(parts[0]));
} else if(parts.length == 2) {
for (int j = Integer.valueOf(parts[0]); j < Integer.valueOf(parts[1]) + 1; j++) {
slotsCollection.add(j);
}
}
}
}
NodeType type = null;
if (flags.contains(Flag.MASTER)) {
type = NodeType.MASTER;
} else if (flags.contains(Flag.SLAVE)) {
type = NodeType.SLAVE;
}
RedisClusterNodeBuilder builder = RedisClusterNode.newRedisClusterNode()
.linkState(linkState)
.slaveOf(masterId)
.serving(new SlotRange(slotsCollection))
.withId(nodeId)
.promotedAs(type)
.withFlags(flags);
if (address != null) {
builder.listeningAt(address.getHost(), address.getPort());
}
nodes.add(builder.build());
}
return nodes;
}
}

@ -15,18 +15,39 @@
*/
package org.redisson.spring.data.connection;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.StringMapDataDecoder;
import org.redisson.connection.MasterSlaveEntry;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.RedisClusterConnection;
import org.springframework.data.redis.connection.RedisClusterNode;
import org.springframework.data.redis.connection.RedisClusterNode.SlotRange;
import org.springframework.data.redis.connection.convert.StringToRedisClientInfoConverter;
import org.springframework.data.redis.core.types.RedisClientInfo;
import org.springframework.util.Assert;
import io.netty.util.CharsetUtil;
/**
*
@ -35,218 +56,325 @@ import org.springframework.data.redis.core.types.RedisClientInfo;
*/
public class RedissonClusterConnection extends RedissonConnection implements RedisClusterConnection {
private static final RedisStrictCommand<List<RedisClusterNode>> CLUSTER_NODES =
new RedisStrictCommand<List<RedisClusterNode>>("CLUSTER", "NODES", new RedisClusterNodeDecoder());
public RedissonClusterConnection(RedissonClient redisson) {
super(redisson);
}
@Override
public Iterable<RedisClusterNode> clusterGetNodes() {
// TODO Auto-generated method stub
return null;
return read(null, StringCodec.INSTANCE, CLUSTER_NODES);
}
@Override
public Collection<RedisClusterNode> clusterGetSlaves(RedisClusterNode master) {
// TODO Auto-generated method stub
return null;
Iterable<RedisClusterNode> res = clusterGetNodes();
RedisClusterNode masterNode = null;
for (Iterator<RedisClusterNode> iterator = res.iterator(); iterator.hasNext();) {
RedisClusterNode redisClusterNode = iterator.next();
if (master.getHost().equals(redisClusterNode.getHost())
&& master.getPort().equals(redisClusterNode.getPort())) {
masterNode = redisClusterNode;
break;
}
}
if (masterNode == null) {
throw new IllegalStateException("Unable to find master node: " + master);
}
for (Iterator<RedisClusterNode> iterator = res.iterator(); iterator.hasNext();) {
RedisClusterNode redisClusterNode = iterator.next();
if (redisClusterNode.getMasterId() == null
|| !redisClusterNode.getMasterId().equals(masterNode.getId())) {
iterator.remove();
}
}
return (Collection<RedisClusterNode>) res;
}
@Override
public Map<RedisClusterNode, Collection<RedisClusterNode>> clusterGetMasterSlaveMap() {
// TODO Auto-generated method stub
return null;
Iterable<RedisClusterNode> res = clusterGetNodes();
Set<RedisClusterNode> masters = new HashSet<RedisClusterNode>();
for (Iterator<RedisClusterNode> iterator = res.iterator(); iterator.hasNext();) {
RedisClusterNode redisClusterNode = iterator.next();
if (redisClusterNode.isMaster()) {
masters.add(redisClusterNode);
}
}
Map<RedisClusterNode, Collection<RedisClusterNode>> result = new HashMap<RedisClusterNode, Collection<RedisClusterNode>>();
for (Iterator<RedisClusterNode> iterator = res.iterator(); iterator.hasNext();) {
RedisClusterNode redisClusterNode = iterator.next();
for (RedisClusterNode masterNode : masters) {
if (redisClusterNode.getMasterId() != null
&& redisClusterNode.getMasterId().equals(masterNode.getId())) {
Collection<RedisClusterNode> list = result.get(masterNode);
if (list == null) {
list = new ArrayList<RedisClusterNode>();
result.put(masterNode, list);
}
list.add(redisClusterNode);
}
}
}
return result;
}
@Override
public Integer clusterGetSlotForKey(byte[] key) {
// TODO Auto-generated method stub
return null;
RFuture<Integer> f = executorService.readAsync((String)null, StringCodec.INSTANCE, RedisCommands.KEYSLOT, key);
return syncFuture(f);
}
@Override
public RedisClusterNode clusterGetNodeForSlot(int slot) {
// TODO Auto-generated method stub
Iterable<RedisClusterNode> res = clusterGetNodes();
for (RedisClusterNode redisClusterNode : res) {
if (redisClusterNode.isMaster() && redisClusterNode.getSlotRange().contains(slot)) {
return redisClusterNode;
}
}
return null;
}
@Override
public RedisClusterNode clusterGetNodeForKey(byte[] key) {
// TODO Auto-generated method stub
return null;
int slot = executorService.getConnectionManager().calcSlot(key);
return clusterGetNodeForSlot(slot);
}
@Override
public ClusterInfo clusterGetClusterInfo() {
// TODO Auto-generated method stub
return null;
RFuture<Map<String, String>> f = executorService.readAsync((String)null, StringCodec.INSTANCE, RedisCommands.CLUSTER_INFO);
syncFuture(f);
Properties props = new Properties();
for (Entry<String, String> entry : f.getNow().entrySet()) {
props.setProperty(entry.getKey(), entry.getValue());
}
return new ClusterInfo(props);
}
@Override
public void clusterAddSlots(RedisClusterNode node, int... slots) {
// TODO Auto-generated method stub
MasterSlaveEntry entry = getEntry(node);
List<Integer> params = convert(slots);
RFuture<Map<String, String>> f = executorService.writeAsync(entry, StringCodec.INSTANCE, RedisCommands.CLUSTER_ADDSLOTS, params.toArray());
syncFuture(f);
}
protected List<Integer> convert(int... slots) {
List<Integer> params = new ArrayList<Integer>();
for (int slot : slots) {
params.add(slot);
}
return params;
}
@Override
public void clusterAddSlots(RedisClusterNode node, SlotRange range) {
// TODO Auto-generated method stub
clusterAddSlots(node, range.getSlotsArray());
}
@Override
public Long clusterCountKeysInSlot(int slot) {
// TODO Auto-generated method stub
return null;
RedisClusterNode node = clusterGetNodeForSlot(slot);
MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(new InetSocketAddress(node.getHost(), node.getPort()));
RFuture<Long> f = executorService.readAsync(entry, StringCodec.INSTANCE, RedisCommands.CLUSTER_COUNTKEYSINSLOT, slot);
return syncFuture(f);
}
@Override
public void clusterDeleteSlots(RedisClusterNode node, int... slots) {
// TODO Auto-generated method stub
MasterSlaveEntry entry = getEntry(node);
List<Integer> params = convert(slots);
RFuture<Long> f = executorService.writeAsync(entry, StringCodec.INSTANCE, RedisCommands.CLUSTER_DELSLOTS, params.toArray());
syncFuture(f);
}
@Override
public void clusterDeleteSlotsInRange(RedisClusterNode node, SlotRange range) {
// TODO Auto-generated method stub
clusterDeleteSlots(node, range.getSlotsArray());
}
@Override
public void clusterForget(RedisClusterNode node) {
// TODO Auto-generated method stub
RFuture<Void> f = executorService.writeAsync((String)null, StringCodec.INSTANCE, RedisCommands.CLUSTER_FORGET, node.getId());
syncFuture(f);
}
@Override
public void clusterMeet(RedisClusterNode node) {
// TODO Auto-generated method stub
Assert.notNull(node, "Cluster node must not be null for CLUSTER MEET command!");
Assert.hasText(node.getHost(), "Node to meet cluster must have a host!");
Assert.isTrue(node.getPort() > 0, "Node to meet cluster must have a port greater 0!");
RFuture<Void> f = executorService.writeAsync((String)null, StringCodec.INSTANCE, RedisCommands.CLUSTER_MEET, node.getHost(), node.getPort());
syncFuture(f);
}
@Override
public void clusterSetSlot(RedisClusterNode node, int slot, AddSlots mode) {
// TODO Auto-generated method stub
MasterSlaveEntry entry = getEntry(node);
RFuture<Map<String, String>> f = executorService.writeAsync(entry, StringCodec.INSTANCE, RedisCommands.CLUSTER_SETSLOT, slot, mode);
syncFuture(f);
}
private static final RedisStrictCommand<List<String>> CLUSTER_GETKEYSINSLOT = new RedisStrictCommand<List<String>>("CLUSTER", "GETKEYSINSLOT", new ObjectListReplayDecoder<String>());
@Override
public List<byte[]> clusterGetKeysInSlot(int slot, Integer count) {
// TODO Auto-generated method stub
return null;
RFuture<List<byte[]>> f = executorService.readAsync((String)null, ByteArrayCodec.INSTANCE, CLUSTER_GETKEYSINSLOT, slot, count);
return syncFuture(f);
}
@Override
public void clusterReplicate(RedisClusterNode master, RedisClusterNode slave) {
// TODO Auto-generated method stub
MasterSlaveEntry entry = getEntry(master);
RFuture<Long> f = executorService.writeAsync(entry, StringCodec.INSTANCE, RedisCommands.CLUSTER_REPLICATE, slave.getId());
syncFuture(f);
}
@Override
public String ping(RedisClusterNode node) {
// TODO Auto-generated method stub
return null;
return execute(node, RedisCommands.PING);
}
@Override
public void bgReWriteAof(RedisClusterNode node) {
// TODO Auto-generated method stub
execute(node, RedisCommands.BGREWRITEAOF);
}
@Override
public void bgSave(RedisClusterNode node) {
// TODO Auto-generated method stub
execute(node, RedisCommands.BGSAVE);
}
@Override
public Long lastSave(RedisClusterNode node) {
// TODO Auto-generated method stub
return null;
return execute(node, RedisCommands.LASTSAVE);
}
@Override
public void save(RedisClusterNode node) {
// TODO Auto-generated method stub
execute(node, RedisCommands.SAVE);
}
@Override
public Long dbSize(RedisClusterNode node) {
// TODO Auto-generated method stub
return null;
return execute(node, RedisCommands.DBSIZE);
}
private <T> T execute(RedisClusterNode node, RedisCommand<T> command) {
MasterSlaveEntry entry = getEntry(node);
RFuture<T> f = executorService.writeAsync(entry, StringCodec.INSTANCE, command);
return syncFuture(f);
}
protected MasterSlaveEntry getEntry(RedisClusterNode node) {
MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(new InetSocketAddress(node.getHost(), node.getPort()));
return entry;
}
@Override
public void flushDb(RedisClusterNode node) {
// TODO Auto-generated method stub
execute(node, RedisCommands.FLUSHDB);
}
@Override
public void flushAll(RedisClusterNode node) {
// TODO Auto-generated method stub
execute(node, RedisCommands.FLUSHALL);
}
@Override
public Properties info(RedisClusterNode node) {
// TODO Auto-generated method stub
return null;
Map<String, String> info = execute(node, RedisCommands.INFO_ALL);
Properties result = new Properties();
for (Entry<String, String> entry : info.entrySet()) {
result.setProperty(entry.getKey(), entry.getValue());
}
return result;
}
@Override
public Properties info(RedisClusterNode node, String section) {
// TODO Auto-generated method stub
return null;
RedisStrictCommand<Map<String, String>> command = new RedisStrictCommand<Map<String, String>>("INFO", section, new StringMapDataDecoder());
Map<String, String> info = execute(node, command);
Properties result = new Properties();
for (Entry<String, String> entry : info.entrySet()) {
result.setProperty(entry.getKey(), entry.getValue());
}
return result;
}
@Override
public Set<byte[]> keys(RedisClusterNode node, byte[] pattern) {
// TODO Auto-generated method stub
return null;
RFuture<Collection<String>> f = executorService.readAllAsync(RedisCommands.KEYS, pattern);
Collection<String> keys = syncFuture(f);
Set<byte[]> result = new HashSet<byte[]>();
for (String key : keys) {
result.add(key.getBytes(CharsetUtil.UTF_8));
}
return result;
}
@Override
public byte[] randomKey(RedisClusterNode node) {
// TODO Auto-generated method stub
return null;
MasterSlaveEntry entry = getEntry(node);
RFuture<byte[]> f = executorService.readRandomAsync(entry, ByteArrayCodec.INSTANCE, RedisCommands.RANDOM_KEY);
return syncFuture(f);
}
@Override
public void shutdown(RedisClusterNode node) {
// TODO Auto-generated method stub
MasterSlaveEntry entry = getEntry(node);
RFuture<Void> f = executorService.readAsync(entry, ByteArrayCodec.INSTANCE, RedisCommands.SHUTDOWN);
syncFuture(f);
}
@Override
public List<String> getConfig(RedisClusterNode node, String pattern) {
// TODO Auto-generated method stub
return null;
MasterSlaveEntry entry = getEntry(node);
RFuture<List<String>> f = executorService.writeAsync(entry, StringCodec.INSTANCE, RedisCommands.CONFIG_GET, pattern);
return syncFuture(f);
}
@Override
public void setConfig(RedisClusterNode node, String param, String value) {
// TODO Auto-generated method stub
MasterSlaveEntry entry = getEntry(node);
RFuture<Void> f = executorService.writeAsync(entry, StringCodec.INSTANCE, RedisCommands.CONFIG_SET, param, value);
syncFuture(f);
}
@Override
public void resetConfigStats(RedisClusterNode node) {
// TODO Auto-generated method stub
MasterSlaveEntry entry = getEntry(node);
RFuture<Void> f = executorService.writeAsync(entry, StringCodec.INSTANCE, RedisCommands.CONFIG_RESETSTAT);
syncFuture(f);
}
@Override
public Long time(RedisClusterNode node) {
// TODO Auto-generated method stub
return null;
MasterSlaveEntry entry = getEntry(node);
RFuture<Long> f = executorService.readAsync(entry, LongCodec.INSTANCE, RedisCommands.TIME_LONG);
return syncFuture(f);
}
private static final StringToRedisClientInfoConverter CONVERTER = new StringToRedisClientInfoConverter();
@Override
public List<RedisClientInfo> getClientList(RedisClusterNode node) {
// TODO Auto-generated method stub
return null;
MasterSlaveEntry entry = getEntry(node);
RFuture<List<String>> f = executorService.readAsync(entry, StringCodec.INSTANCE, RedisCommands.CLIENT_LIST);
List<String> list = syncFuture(f);
return CONVERTER.convert(list.toArray(new String[list.size()]));
}
}

@ -110,9 +110,9 @@ import io.netty.util.concurrent.FutureListener;
public class RedissonConnection extends AbstractRedisConnection {
private boolean closed;
private final Redisson redisson;
protected final Redisson redisson;
private CommandAsyncService executorService;
CommandAsyncService executorService;
private RedissonSubscription subscription;
public RedissonConnection(RedissonClient redisson) {
@ -223,7 +223,7 @@ public class RedissonConnection extends AbstractRedisConnection {
return ReflectionUtils.invokeMethod(method, this, Arrays.asList(args).toArray());
}
private <V> V syncFuture(RFuture<V> future) {
<V> V syncFuture(RFuture<V> future) {
try {
return executorService.get(future);
} catch (Exception ex) {
@ -813,7 +813,7 @@ public class RedissonConnection extends AbstractRedisConnection {
private final List<Integer> indexToRemove = new ArrayList<Integer>();
private int index = -1;
private <T> T write(byte[] key, Codec codec, RedisCommand<?> command, Object... params) {
<T> T write(byte[] key, Codec codec, RedisCommand<?> command, Object... params) {
RFuture<T> f = executorService.writeAsync(key, codec, command, params);
indexCommand(command);
return sync(f);
@ -828,7 +828,7 @@ public class RedissonConnection extends AbstractRedisConnection {
}
}
private <T> T read(byte[] key, Codec codec, RedisCommand<?> command, Object... params) {
<T> T read(byte[] key, Codec codec, RedisCommand<?> command, Object... params) {
RFuture<T> f = executorService.readAsync(key, codec, command, params);
indexCommand(command);
return sync(f);
@ -1703,25 +1703,19 @@ public class RedissonConnection extends AbstractRedisConnection {
throw new UnsupportedOperationException();
}
private static final RedisStrictCommand<Void> BGREWRITEAOF = new RedisStrictCommand<Void>("BGREWRITEAOF", new VoidReplayConvertor());
@Override
public void bgReWriteAof() {
write(null, StringCodec.INSTANCE, BGREWRITEAOF);
write(null, StringCodec.INSTANCE, RedisCommands.BGREWRITEAOF);
}
private static final RedisStrictCommand<Void> BGSAVE = new RedisStrictCommand<Void>("BGSAVE", new VoidReplayConvertor());
@Override
public void bgSave() {
write(null, StringCodec.INSTANCE, BGSAVE);
write(null, StringCodec.INSTANCE, RedisCommands.BGSAVE);
}
private static final RedisStrictCommand<Long> LASTSAVE = new RedisStrictCommand<Long>("LASTSAVE");
@Override
public Long lastSave() {
return write(null, StringCodec.INSTANCE, LASTSAVE);
return write(null, StringCodec.INSTANCE, RedisCommands.LASTSAVE);
}
private static final RedisStrictCommand<Void> SAVE = new RedisStrictCommand<Void>("SAVE", new VoidReplayConvertor());
@ -1804,7 +1798,7 @@ public class RedissonConnection extends AbstractRedisConnection {
throw new UnsupportedOperationException();
}
private static final RedisStrictCommand<Long> TIME = new RedisStrictCommand<Long>("TIME", new TimeObjectDecoder());
private static final RedisStrictCommand<Long> TIME = new RedisStrictCommand<Long>("TIME", new TimeLongObjectDecoder());
@Override
public Long time() {

@ -15,12 +15,18 @@
*/
package org.redisson.spring.data.connection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.config.Config;
import org.redisson.connection.SentinelConnectionManager;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.data.redis.ExceptionTranslationStrategy;
import org.springframework.data.redis.PassThroughExceptionTranslationStrategy;
import org.springframework.data.redis.connection.RedisClusterConnection;
@ -36,6 +42,8 @@ import org.springframework.data.redis.connection.RedisSentinelConnection;
*/
public class RedissonConnectionFactory implements RedisConnectionFactory, InitializingBean, DisposableBean {
private final static Log log = LogFactory.getLog(RedissonConnectionFactory.class);
public static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION =
new PassThroughExceptionTranslationStrategy(new RedissonExceptionConverter());
@ -92,6 +100,9 @@ public class RedissonConnectionFactory implements RedisConnectionFactory, Initia
@Override
public RedisClusterConnection getClusterConnection() {
if (!redisson.getConfig().isClusterConfig()) {
throw new InvalidDataAccessResourceUsageException("Redisson is not in Cluster mode");
}
return new RedissonClusterConnection(redisson);
}
@ -102,8 +113,25 @@ public class RedissonConnectionFactory implements RedisConnectionFactory, Initia
@Override
public RedisSentinelConnection getSentinelConnection() {
// TODO Auto-generated method stub
return null;
if (!redisson.getConfig().isSentinelConfig()) {
throw new InvalidDataAccessResourceUsageException("Redisson is not in Sentinel mode");
}
SentinelConnectionManager manager = ((SentinelConnectionManager)((Redisson)redisson).getConnectionManager());
for (RedisClient client : manager.getSentinels()) {
org.redisson.client.RedisConnection connection = client.connect();
try {
String res = connection.sync(RedisCommands.PING);
if ("pong".equalsIgnoreCase(res)) {
return new RedissonSentinelConnection(connection);
}
} catch (Exception e) {
log.warn("Can't connect to " + client, e);
connection.closeAsync();
}
}
throw new InvalidDataAccessResourceUsageException("Sentinels are not found");
}
}

@ -0,0 +1,91 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.redisson.client.RedisConnection;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.springframework.data.redis.connection.NamedNode;
import org.springframework.data.redis.connection.RedisSentinelConnection;
import org.springframework.data.redis.connection.RedisServer;
import org.springframework.data.redis.connection.convert.Converters;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonSentinelConnection implements RedisSentinelConnection {
private final RedisConnection connection;
public RedissonSentinelConnection(RedisConnection connection) {
this.connection = connection;
}
@Override
public void failover(NamedNode master) {
connection.sync(RedisCommands.SENTINEL_FAILOVER, master.getName());
}
private static List<RedisServer> toRedisServersList(List<Map<String, String>> source) {
List<RedisServer> servers = new ArrayList<RedisServer>(source.size());
for (Map<String, String> info : source) {
servers.add(RedisServer.newServerFrom(Converters.toProperties(info)));
}
return servers;
}
@Override
public Collection<RedisServer> masters() {
List<Map<String, String>> masters = connection.sync(StringCodec.INSTANCE, RedisCommands.SENTINEL_MASTERS);
return toRedisServersList(masters);
}
@Override
public Collection<RedisServer> slaves(NamedNode master) {
List<Map<String, String>> slaves = connection.sync(StringCodec.INSTANCE, RedisCommands.SENTINEL_SLAVES, master.getName());
return toRedisServersList(slaves);
}
@Override
public void remove(NamedNode master) {
connection.sync(RedisCommands.SENTINEL_REMOVE, master.getName());
}
@Override
public void monitor(RedisServer master) {
connection.sync(RedisCommands.SENTINEL_MONITOR, master.getName(), master.getHost(),
master.getPort().intValue(), master.getQuorum().intValue());
}
@Override
public void close() throws IOException {
connection.closeAsync();
}
@Override
public boolean isOpen() {
return !connection.isClosed();
}
}

@ -0,0 +1,40 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import java.util.ArrayList;
import java.util.List;
import org.redisson.client.protocol.convertor.SingleConvertor;
/**
*
* @author Nikita Koksharov
*
*/
public class StringToListConvertor extends SingleConvertor<List<String>> {
@Override
public List<String> convert(Object obj) {
String value = (String) obj;
List<String> result = new ArrayList<String>();
for (String entry : value.split("\r\n|\n")) {
result.add(entry);
}
return result;
}
}

@ -26,8 +26,7 @@ import org.redisson.client.protocol.decoder.MultiDecoder;
* @author Nikita Koksharov
*
*/
public class TimeObjectDecoder implements MultiDecoder<Long> {
public class TimeLongObjectDecoder implements MultiDecoder<Long> {
@Override
public Decoder<Object> getDecoder(int paramNum, State state) {

@ -0,0 +1,22 @@
package org.redisson.spring.data.connection;
import org.junit.Before;
import org.redisson.BaseTest;
import org.redisson.RedisRunner;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.jedis.JedisConnection;
import redis.clients.jedis.Jedis;
public abstract class BaseConnectionTest extends BaseTest {
RedisConnection connection;
@Before
public void init() {
Jedis jedis = new Jedis(RedisRunner.getDefaultRedisServerBindAddressAndPort());
connection = new JedisConnection(jedis);
// connection = new RedissonConnection(redisson);
}
}

@ -0,0 +1,222 @@
package org.redisson.spring.data.connection;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.assertj.core.api.Assertions.*;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.redisson.ClusterRunner;
import org.redisson.RedisRunner;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.Redisson;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SubscriptionMode;
import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.RedisClusterNode;
import org.springframework.data.redis.connection.RedisNode.NodeType;
import org.springframework.data.redis.core.types.RedisClientInfo;
public class RedissonClusterConnectionTest {
static RedissonClient redisson;
static RedissonClusterConnection connection;
static ClusterProcesses process;
@BeforeClass
public static void before() throws FailedToStartRedisException, IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setSubscriptionMode(SubscriptionMode.SLAVE)
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
redisson = Redisson.create(config);
connection = new RedissonClusterConnection(redisson);
}
@AfterClass
public static void after() {
process.shutdown();
redisson.shutdown();
}
@Test
public void testClusterGetNodes() {
Iterable<RedisClusterNode> nodes = connection.clusterGetNodes();
assertThat(nodes).hasSize(6);
for (RedisClusterNode redisClusterNode : nodes) {
assertThat(redisClusterNode.getLinkState()).isNotNull();
assertThat(redisClusterNode.getFlags()).isNotEmpty();
assertThat(redisClusterNode.getHost()).isNotNull();
assertThat(redisClusterNode.getPort()).isNotNull();
assertThat(redisClusterNode.getId()).isNotNull();
assertThat(redisClusterNode.getType()).isNotNull();
if (redisClusterNode.getType() == NodeType.MASTER) {
assertThat(redisClusterNode.getSlotRange().getSlots()).isNotEmpty();
} else {
assertThat(redisClusterNode.getMasterId()).isNotNull();
}
}
}
@Test
public void testClusterGetNodesMaster() {
Iterable<RedisClusterNode> nodes = connection.clusterGetNodes();
for (RedisClusterNode redisClusterNode : nodes) {
if (redisClusterNode.getType() == NodeType.MASTER) {
Collection<RedisClusterNode> slaves = connection.clusterGetSlaves(redisClusterNode);
assertThat(slaves).hasSize(1);
}
}
}
@Test
public void testClusterGetMasterSlaveMap() {
Map<RedisClusterNode, Collection<RedisClusterNode>> map = connection.clusterGetMasterSlaveMap();
assertThat(map).hasSize(3);
for (Collection<RedisClusterNode> slaves : map.values()) {
assertThat(slaves).hasSize(1);
}
}
@Test
public void testClusterGetSlotForKey() {
Integer slot = connection.clusterGetSlotForKey("123".getBytes());
assertThat(slot).isNotNull();
}
@Test
public void testClusterGetNodeForSlot() {
RedisClusterNode node1 = connection.clusterGetNodeForSlot(1);
RedisClusterNode node2 = connection.clusterGetNodeForSlot(16000);
assertThat(node1.getId()).isNotEqualTo(node2.getId());
}
@Test
public void testClusterGetNodeForKey() {
RedisClusterNode node = connection.clusterGetNodeForKey("123".getBytes());
assertThat(node).isNotNull();
}
@Test
public void testClusterGetClusterInfo() {
ClusterInfo info = connection.clusterGetClusterInfo();
assertThat(info.getSlotsFail()).isEqualTo(0);
assertThat(info.getSlotsOk()).isEqualTo(MasterSlaveConnectionManager.MAX_SLOT);
assertThat(info.getSlotsAssigned()).isEqualTo(MasterSlaveConnectionManager.MAX_SLOT);
}
@Test
public void testClusterAddRemoveSlots() {
RedisClusterNode master = getFirstMaster();
Integer slot = master.getSlotRange().getSlots().iterator().next();
connection.clusterDeleteSlots(master, slot);
connection.clusterAddSlots(master, slot);
}
@Test
public void testClusterCountKeysInSlot() {
Long t = connection.clusterCountKeysInSlot(1);
assertThat(t).isZero();
}
@Test
public void testClusterMeetForget() {
RedisClusterNode master = getFirstMaster();
connection.clusterForget(master);
connection.clusterMeet(master);
}
@Test
public void testClusterGetKeysInSlot() {
List<byte[]> keys = connection.clusterGetKeysInSlot(12, 10);
assertThat(keys).isEmpty();
}
@Test
public void testClusterPing() {
RedisClusterNode master = getFirstMaster();
String res = connection.ping(master);
assertThat(res).isEqualTo("PONG");
}
@Test
public void testDbSize() {
RedisClusterNode master = getFirstMaster();
Long size = connection.dbSize(master);
assertThat(size).isZero();
}
@Test
public void testInfo() {
RedisClusterNode master = getFirstMaster();
Properties info = connection.info(master);
assertThat(info.size()).isGreaterThan(10);
}
@Test
public void testResetConfigStats() {
RedisClusterNode master = getFirstMaster();
connection.resetConfigStats(master);
}
@Test
public void testTime() {
RedisClusterNode master = getFirstMaster();
Long time = connection.time(master);
assertThat(time).isGreaterThan(1000);
}
@Test
public void testGetClientList() {
RedisClusterNode master = getFirstMaster();
List<RedisClientInfo> list = connection.getClientList(master);
assertThat(list.size()).isGreaterThan(10);
}
@Test
public void testSetConfig() {
RedisClusterNode master = getFirstMaster();
connection.setConfig(master, "timeout", "10");
}
@Test
public void testGetConfig() {
RedisClusterNode master = getFirstMaster();
List<String> config = connection.getConfig(master, "*");
assertThat(config.size()).isGreaterThan(20);
}
protected RedisClusterNode getFirstMaster() {
Map<RedisClusterNode, Collection<RedisClusterNode>> map = connection.clusterGetMasterSlaveMap();
RedisClusterNode master = map.keySet().iterator().next();
return master;
}
}

@ -0,0 +1,26 @@
package org.redisson.spring.data.connection;
import static org.assertj.core.api.Assertions.assertThat;
import org.junit.Test;
public class RedissonConnectionTest extends BaseConnectionTest {
@Test
public void testEcho() {
assertThat(connection.echo("test".getBytes())).isEqualTo("test".getBytes());
}
@Test
public void testSetGet() {
connection.set("key".getBytes(), "value".getBytes());
assertThat(connection.get("key".getBytes())).isEqualTo("value".getBytes());
}
@Test
public void testHSetGet() {
assertThat(connection.hSet("key".getBytes(), "field".getBytes(), "value".getBytes())).isTrue();
assertThat(connection.hGet("key".getBytes(), "field".getBytes())).isEqualTo("value".getBytes());
}
}

@ -0,0 +1,45 @@
package org.redisson.spring.data.connection;
import static org.assertj.core.api.Assertions.*;
import java.util.List;
import org.junit.Test;
import org.redisson.BaseTest;
public class RedissonMultiConnectionTest extends BaseConnectionTest {
@Test
public void testEcho() {
RedissonConnection connection = new RedissonConnection(redisson);
connection.multi();
assertThat(connection.echo("test".getBytes())).isNull();
assertThat(connection.exec().iterator().next()).isEqualTo("test".getBytes());
}
@Test
public void testSetGet() {
RedissonConnection connection = new RedissonConnection(redisson);
connection.multi();
assertThat(connection.isQueueing()).isTrue();
connection.set("key".getBytes(), "value".getBytes());
assertThat(connection.get("key".getBytes())).isNull();
List<Object> result = connection.exec();
assertThat(connection.isQueueing()).isFalse();
assertThat(result.get(0)).isEqualTo("value".getBytes());
}
@Test
public void testHSetGet() {
RedissonConnection connection = new RedissonConnection(redisson);
connection.multi();
assertThat(connection.hSet("key".getBytes(), "field".getBytes(), "value".getBytes())).isNull();
assertThat(connection.hGet("key".getBytes(), "field".getBytes())).isNull();
List<Object> result = connection.exec();
assertThat((Boolean)result.get(0)).isTrue();
assertThat(result.get(1)).isEqualTo("value".getBytes());
}
}

@ -0,0 +1,46 @@
package org.redisson.spring.data.connection;
import static org.assertj.core.api.Assertions.*;
import java.util.List;
import org.junit.Test;
import org.redisson.BaseTest;
public class RedissonPipelineConnectionTest extends BaseConnectionTest {
@Test
public void testEcho() {
RedissonConnection connection = new RedissonConnection(redisson);
connection.openPipeline();
assertThat(connection.echo("test".getBytes())).isNull();
assertThat(connection.closePipeline().iterator().next()).isEqualTo("test".getBytes());
}
@Test
public void testSetGet() {
RedissonConnection connection = new RedissonConnection(redisson);
connection.openPipeline();
assertThat(connection.isPipelined()).isTrue();
connection.set("key".getBytes(), "value".getBytes());
assertThat(connection.get("key".getBytes())).isNull();
List<Object> result = connection.closePipeline();
assertThat(connection.isPipelined()).isFalse();
assertThat(result.get(0)).isEqualTo("value".getBytes());
}
@Test
public void testHSetGet() {
RedissonConnection connection = new RedissonConnection(redisson);
connection.openPipeline();
assertThat(connection.hSet("key".getBytes(), "field".getBytes(), "value".getBytes())).isNull();
assertThat(connection.hGet("key".getBytes(), "field".getBytes())).isNull();
List<Object> result = connection.closePipeline();
assertThat((Boolean)result.get(0)).isTrue();
assertThat(result.get(1)).isEqualTo("value".getBytes());
}
}

@ -0,0 +1,132 @@
package org.redisson.spring.data.connection;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.util.Collection;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.redisson.RedisRunner;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.springframework.data.redis.connection.RedisSentinelConnection;
import org.springframework.data.redis.connection.RedisServer;
public class RedissonSentinelConnectionTest {
RedissonClient redisson;
RedisSentinelConnection connection;
RedisRunner.RedisProcess master;
RedisRunner.RedisProcess slave1;
RedisRunner.RedisProcess slave2;
RedisRunner.RedisProcess sentinel1;
RedisRunner.RedisProcess sentinel2;
RedisRunner.RedisProcess sentinel3;
@Before
public void before() throws FailedToStartRedisException, IOException, InterruptedException {
master = new RedisRunner()
.nosave()
.randomDir()
.run();
slave1 = new RedisRunner()
.port(6380)
.nosave()
.randomDir()
.slaveof("127.0.0.1", 6379)
.run();
slave2 = new RedisRunner()
.port(6381)
.nosave()
.randomDir()
.slaveof("127.0.0.1", 6379)
.run();
sentinel1 = new RedisRunner()
.nosave()
.randomDir()
.port(26379)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
sentinel2 = new RedisRunner()
.nosave()
.randomDir()
.port(26380)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
sentinel3 = new RedisRunner()
.nosave()
.randomDir()
.port(26381)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
Thread.sleep(5000);
Config config = new Config();
config.useSentinelServers()
.setLoadBalancer(new RandomLoadBalancer())
.addSentinelAddress(sentinel3.getRedisServerAddressAndPort()).setMasterName("myMaster");
redisson = Redisson.create(config);
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
connection = factory.getSentinelConnection();
}
@After
public void after() {
sentinel1.stop();
sentinel2.stop();
sentinel3.stop();
master.stop();
slave1.stop();
slave2.stop();
redisson.shutdown();
}
@Test
public void testMasters() {
Collection<RedisServer> masters = connection.masters();
assertThat(masters).hasSize(1);
}
@Test
public void testSlaves() {
Collection<RedisServer> masters = connection.masters();
Collection<RedisServer> slaves = connection.slaves(masters.iterator().next());
assertThat(slaves).hasSize(2);
}
@Test
public void testRemove() {
Collection<RedisServer> masters = connection.masters();
connection.remove(masters.iterator().next());
}
@Test
public void testMonitor() {
Collection<RedisServer> masters = connection.masters();
RedisServer master = masters.iterator().next();
master.setName(master.getName() + ":");
connection.monitor(master);
}
@Test
public void testFailover() throws InterruptedException {
Collection<RedisServer> masters = connection.masters();
connection.failover(masters.iterator().next());
Thread.sleep(10000);
RedisServer newMaster = connection.masters().iterator().next();
assertThat(masters.iterator().next().getPort()).isNotEqualTo(newMaster.getPort());
}
}

@ -0,0 +1,54 @@
package org.redisson.spring.data.connection;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicReference;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.junit.Test;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
public class RedissonSubscribeTest extends BaseConnectionTest {
@Test
public void testSubscribe() {
RedissonConnection connection = new RedissonConnection(redisson);
AtomicReference<byte[]> msg = new AtomicReference<byte[]>();
connection.subscribe(new MessageListener() {
@Override
public void onMessage(Message message, byte[] pattern) {
msg.set(message.getBody());
}
}, "test".getBytes());
connection.publish("test".getBytes(), "msg".getBytes());
Awaitility.await().atMost(Duration.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
connection.getSubscription().unsubscribe();
connection.publish("test".getBytes(), "msg".getBytes());
}
@Test
public void testUnSubscribe() {
RedissonConnection connection = new RedissonConnection(redisson);
AtomicReference<byte[]> msg = new AtomicReference<byte[]>();
connection.subscribe(new MessageListener() {
@Override
public void onMessage(Message message, byte[] pattern) {
msg.set(message.getBody());
}
}, "test".getBytes());
connection.publish("test".getBytes(), "msg".getBytes());
Awaitility.await().atMost(Duration.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
connection.getSubscription().unsubscribe();
}
}
Loading…
Cancel
Save