Feature - Spring Data Redis 3.3.0 integration

pull/5924/head
Nikita Koksharov 8 months ago
parent 689503a6ca
commit ee5f9a98fb

@ -0,0 +1,100 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-data</artifactId>
<version>3.30.1-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
<artifactId>redisson-spring-data-33</artifactId>
<packaging>jar</packaging>
<name>Redisson/Spring Data Redis v3.3.x integration</name>
<dependencies>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.hazendaz.jmockit</groupId>
<artifactId>jmockit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
<forkCount>4</forkCount>
<reuseForks>true</reuseForks>
<argLine>${argLine} -javaagent:"${settings.localRepository}"/com/github/hazendaz/jmockit/jmockit/1.52.0/jmockit-1.52.0.jar</argLine>
</configuration>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.4.1</version>
<configuration>
<archive>
<manifestEntries>
<Build-Time>${maven.build.timestamp}</Build-Time>
<Automatic-Module-Name>redisson.spring.data27</Automatic-Module-Name>
</manifestEntries>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>com.mycila</groupId>
<artifactId>license-maven-plugin</artifactId>
<version>4.3</version>
<configuration>
<basedir>${basedir}</basedir>
<header>${basedir}/../../header.txt</header>
<quiet>false</quiet>
<failIfMissing>true</failIfMissing>
<aggregate>false</aggregate>
<includes>
<include>src/main/java/org/redisson/</include>
</includes>
<excludes>
<exclude>target/**</exclude>
</excludes>
<useDefaultExcludes>true</useDefaultExcludes>
<mapping>
<java>JAVADOC_STYLE</java>
</mapping>
<strictCheck>true</strictCheck>
<useDefaultMapping>true</useDefaultMapping>
<encoding>UTF-8</encoding>
</configuration>
<executions>
<execution>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,282 @@
package org.redisson;
import com.github.dockerjava.api.model.ContainerNetwork;
import com.github.dockerjava.api.model.ExposedPort;
import com.github.dockerjava.api.model.PortBinding;
import com.github.dockerjava.api.model.Ports;
import org.junit.Before;
import org.junit.BeforeClass;
import org.redisson.api.NatMapper;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.Protocol;
import org.redisson.misc.RedisURI;
import org.redisson.spring.data.connection.RedissonClusterConnection;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
public abstract class BaseTest {
protected static final String NOTIFY_KEYSPACE_EVENTS = "--notify-keyspace-events";
protected static final GenericContainer<?> REDIS = createRedis();
protected static final Protocol protocol = Protocol.RESP2;
protected static RedissonClient redisson;
protected static RedissonClient redissonCluster;
private static GenericContainer<?> REDIS_CLUSTER;
protected static GenericContainer<?> createRedis() {
return createRedis("7.2");
}
protected static GenericContainer<?> createRedis(String version) {
return new GenericContainer<>("redis:" + version)
.withCreateContainerCmdModifier(cmd -> {
cmd.withCmd("redis-server", "--save", "''");
})
.withExposedPorts(6379);
}
@BeforeClass
public static void beforeAll() {
if (redisson == null) {
REDIS.start();
Config config = createConfig();
redisson = Redisson.create(config);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
redisson.shutdown();
REDIS.stop();
if (redissonCluster != null) {
redissonCluster.shutdown();
redissonCluster = null;
}
if (REDIS_CLUSTER != null) {
REDIS_CLUSTER.stop();
REDIS_CLUSTER = null;
}
}));
}
}
protected static Config createConfig() {
Config config = new Config();
config.setProtocol(protocol);
config.useSingleServer()
.setAddress("redis://127.0.0.1:" + REDIS.getFirstMappedPort());
return config;
}
protected static RedissonClient createInstance() {
Config config = createConfig();
return Redisson.create(config);
}
protected void testWithParams(Consumer<RedissonClient> redissonCallback, String... params) {
GenericContainer<?> redis =
new GenericContainer<>("redis:7.2")
.withCreateContainerCmdModifier(cmd -> {
List<String> args = new ArrayList<>();
args.add("redis-server");
args.addAll(Arrays.asList(params));
cmd.withCmd(args);
})
.withExposedPorts(6379);
redis.start();
Config config = new Config();
config.setProtocol(protocol);
config.useSingleServer().setAddress("redis://127.0.0.1:" + redis.getFirstMappedPort());
RedissonClient redisson = Redisson.create(config);
try {
redissonCallback.accept(redisson);
} finally {
redisson.shutdown();
redis.stop();
}
}
protected void testInCluster(Consumer<RedissonClusterConnection> redissonCallback) {
if (redissonCluster == null) {
REDIS_CLUSTER = new GenericContainer<>("vishnunair/docker-redis-cluster")
.withExposedPorts(6379, 6380, 6381, 6382, 6383, 6384)
.withStartupCheckStrategy(new MinimumDurationRunningStartupCheckStrategy(Duration.ofSeconds(15)));
REDIS_CLUSTER.start();
Config config = new Config();
config.setProtocol(protocol);
config.useClusterServers()
.setNatMapper(new NatMapper() {
@Override
public RedisURI map(RedisURI uri) {
if (REDIS_CLUSTER.getMappedPort(uri.getPort()) == null) {
return uri;
}
return new RedisURI(uri.getScheme(), REDIS_CLUSTER.getHost(), REDIS_CLUSTER.getMappedPort(uri.getPort()));
}
})
.addNodeAddress("redis://127.0.0.1:" + REDIS_CLUSTER.getFirstMappedPort());
redissonCluster = Redisson.create(config);
}
redissonCallback.accept(new RedissonClusterConnection(redissonCluster));
}
protected void withSentinel(BiConsumer<List<GenericContainer<?>>, Config> callback, int slaves) throws InterruptedException {
Network network = Network.newNetwork();
List<GenericContainer<? extends GenericContainer<?>>> nodes = new ArrayList<>();
GenericContainer<?> master =
new GenericContainer<>("bitnami/redis:7.2.4")
.withNetwork(network)
.withEnv("REDIS_REPLICATION_MODE", "master")
.withEnv("ALLOW_EMPTY_PASSWORD", "yes")
.withNetworkAliases("redis")
.withExposedPorts(6379);
master.start();
assert master.getNetwork() == network;
int masterPort = master.getFirstMappedPort();
master.withCreateContainerCmdModifier(cmd -> {
cmd.getHostConfig().withPortBindings(
new PortBinding(Ports.Binding.bindPort(Integer.valueOf(masterPort)),
cmd.getExposedPorts()[0]));
});
nodes.add(master);
for (int i = 0; i < slaves; i++) {
GenericContainer<?> slave =
new GenericContainer<>("bitnami/redis:7.2.4")
.withNetwork(network)
.withEnv("REDIS_REPLICATION_MODE", "slave")
.withEnv("REDIS_MASTER_HOST", "redis")
.withEnv("ALLOW_EMPTY_PASSWORD", "yes")
.withNetworkAliases("slave" + i)
.withExposedPorts(6379);
slave.start();
int slavePort = slave.getFirstMappedPort();
slave.withCreateContainerCmdModifier(cmd -> {
cmd.getHostConfig().withPortBindings(
new PortBinding(Ports.Binding.bindPort(Integer.valueOf(slavePort)),
cmd.getExposedPorts()[0]));
});
nodes.add(slave);
}
GenericContainer<?> sentinel1 =
new GenericContainer<>("bitnami/redis-sentinel:7.2.4")
.withNetwork(network)
.withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000")
.withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000")
.withNetworkAliases("sentinel1")
.withExposedPorts(26379);
sentinel1.start();
int sentinel1Port = sentinel1.getFirstMappedPort();
sentinel1.withCreateContainerCmdModifier(cmd -> {
cmd.getHostConfig().withPortBindings(
new PortBinding(Ports.Binding.bindPort(Integer.valueOf(sentinel1Port)),
cmd.getExposedPorts()[0]));
});
nodes.add(sentinel1);
GenericContainer<?> sentinel2 =
new GenericContainer<>("bitnami/redis-sentinel:7.2.4")
.withNetwork(network)
.withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000")
.withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000")
.withNetworkAliases("sentinel2")
.withExposedPorts(26379);
sentinel2.start();
int sentinel2Port = sentinel2.getFirstMappedPort();
sentinel2.withCreateContainerCmdModifier(cmd -> {
cmd.getHostConfig().withPortBindings(
new PortBinding(Ports.Binding.bindPort(Integer.valueOf(sentinel2Port)),
cmd.getExposedPorts()[0]));
});
nodes.add(sentinel2);
GenericContainer<?> sentinel3 =
new GenericContainer<>("bitnami/redis-sentinel:7.2.4")
.withNetwork(network)
.withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000")
.withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000")
.withNetworkAliases("sentinel3")
.withExposedPorts(26379);
sentinel3.start();
int sentinel3Port = sentinel3.getFirstMappedPort();
sentinel3.withCreateContainerCmdModifier(cmd -> {
cmd.getHostConfig().withPortBindings(
new PortBinding(Ports.Binding.bindPort(Integer.valueOf(sentinel3Port)),
cmd.getExposedPorts()[0]));
});
nodes.add(sentinel3);
Thread.sleep(5000);
Config config = new Config();
config.setProtocol(protocol);
config.useSentinelServers()
.setPingConnectionInterval(0)
.setNatMapper(new NatMapper() {
@Override
public RedisURI map(RedisURI uri) {
for (GenericContainer<? extends GenericContainer<?>> node : nodes) {
if (node.getContainerInfo() == null) {
continue;
}
Ports.Binding[] mappedPort = node.getContainerInfo().getNetworkSettings()
.getPorts().getBindings().get(new ExposedPort(uri.getPort()));
Map<String, ContainerNetwork> ss = node.getContainerInfo().getNetworkSettings().getNetworks();
ContainerNetwork s = ss.values().iterator().next();
if (uri.getPort() == 6379
&& !uri.getHost().equals("redis")
&& BaseTest.this.getClass() == BaseTest.class
&& node.getNetworkAliases().contains("slave0")) {
return new RedisURI(uri.getScheme(), "127.0.0.1", Integer.valueOf(mappedPort[0].getHostPortSpec()));
}
if (mappedPort != null
&& s.getIpAddress().equals(uri.getHost())) {
return new RedisURI(uri.getScheme(), "127.0.0.1", Integer.valueOf(mappedPort[0].getHostPortSpec()));
}
}
return uri;
}
})
.addSentinelAddress("redis://127.0.0.1:" + sentinel1.getFirstMappedPort())
.setMasterName("mymaster");
callback.accept(nodes, config);
nodes.forEach(n -> n.stop());
network.close();
}
@Before
public void beforeEach() {
redisson.getKeys().flushall();
if (redissonCluster != null) {
redissonCluster.getKeys().flushall();
}
}
}

@ -0,0 +1,156 @@
package org.redisson;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.math.BigInteger;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.redisson.misc.BiHashMap;
/**
*
* @author Rui Gu (https://github.com/jackygurui)
*/
public class ClusterRunner {
private final LinkedHashMap<RedisRunner, String> nodes = new LinkedHashMap<>();
private final LinkedHashMap<String, String> slaveMasters = new LinkedHashMap<>();
public ClusterRunner addNode(RedisRunner runner) {
nodes.putIfAbsent(runner, getRandomId());
if (!runner.hasOption(RedisRunner.REDIS_OPTIONS.CLUSTER_ENABLED)) {
runner.clusterEnabled(true);
}
if (!runner.hasOption(RedisRunner.REDIS_OPTIONS.CLUSTER_NODE_TIMEOUT)) {
runner.clusterNodeTimeout(5000);
}
if (!runner.hasOption(RedisRunner.REDIS_OPTIONS.PORT)) {
runner.randomPort(1);
runner.port(RedisRunner.findFreePort());
}
if (!runner.hasOption(RedisRunner.REDIS_OPTIONS.BIND)) {
runner.bind("127.0.0.1");
}
return this;
}
public ClusterRunner addNode(RedisRunner master, RedisRunner... slaves) {
addNode(master);
for (RedisRunner slave : slaves) {
addNode(slave);
slaveMasters.put(nodes.get(slave), nodes.get(master));
}
return this;
}
public synchronized ClusterProcesses run() throws IOException, InterruptedException, RedisRunner.FailedToStartRedisException {
BiHashMap<String, RedisRunner.RedisProcess> processes = new BiHashMap<>();
for (RedisRunner runner : nodes.keySet()) {
List<String> options = getClusterConfig(runner);
String confFile = runner.dir() + File.separator + nodes.get(runner) + ".conf";
System.out.println("WRITING CONFIG: for " + nodes.get(runner));
try (PrintWriter printer = new PrintWriter(new FileWriter(confFile))) {
options.stream().forEach((line) -> {
printer.println(line);
System.out.println(line);
});
}
processes.put(nodes.get(runner), runner.clusterConfigFile(confFile).run());
}
Thread.sleep(1000);
for (RedisRunner.RedisProcess process : processes.valueSet()) {
if (!process.isAlive()) {
throw new RedisRunner.FailedToStartRedisException();
}
}
return new ClusterProcesses(processes);
}
private List<String> getClusterConfig(RedisRunner runner) {
String me = runner.getInitialBindAddr() + ":" + runner.getPort();
List<String> nodeConfig = new ArrayList<>();
int c = 0;
for (RedisRunner node : nodes.keySet()) {
String nodeId = nodes.get(node);
StringBuilder sb = new StringBuilder();
String nodeAddr = node.getInitialBindAddr() + ":" + node.getPort();
sb.append(nodeId).append(" ");
sb.append(nodeAddr).append(" ");
sb.append(me.equals(nodeAddr)
? "myself,"
: "");
boolean isMaster = !slaveMasters.containsKey(nodeId);
if (isMaster) {
sb.append("master -");
} else {
sb.append("slave ").append(slaveMasters.get(nodeId));
}
sb.append(" ");
sb.append("0").append(" ");
sb.append(me.equals(nodeAddr)
? "0"
: "1").append(" ");
sb.append(c + 1).append(" ");
sb.append("connected ");
if (isMaster) {
sb.append(getSlots(c, nodes.size() - slaveMasters.size()));
c++;
}
nodeConfig.add(sb.toString());
}
nodeConfig.add("vars currentEpoch 0 lastVoteEpoch 0");
return nodeConfig;
}
private static String getSlots(int index, int groupNum) {
final double t = 16383;
int start = index == 0 ? 0 : (int) (t / groupNum * index);
int end = index == groupNum - 1 ? (int) t : (int) (t / groupNum * (index + 1)) - 1;
return start + "-" + end;
}
private static String getRandomId() {
final SecureRandom r = new SecureRandom();
return new BigInteger(160, r).toString(16);
}
public static class ClusterProcesses {
private final BiHashMap<String, RedisRunner.RedisProcess> processes;
private ClusterProcesses(BiHashMap<String, RedisRunner.RedisProcess> processes) {
this.processes = processes;
}
public RedisRunner.RedisProcess getProcess(String nodeId) {
return processes.get(nodeId);
}
public String getNodeId(RedisRunner.RedisProcess process) {
return processes.reverseGet(process);
}
public Set<RedisRunner.RedisProcess> getNodes() {
return processes.valueSet();
}
public Set<String> getNodeIds() {
return processes.keySet();
}
public synchronized Map<String, Integer> shutdown() {
return processes
.entrySet()
.stream()
.collect(Collectors.toMap(
e -> e.getKey(),
e -> e.getValue().stop()));
}
}
}

@ -0,0 +1,58 @@
package org.redisson;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
*
* @author Rui Gu (https://github.com/jackygurui)
*/
public class RedisVersion implements Comparable<RedisVersion>{
private final String fullVersion;
private final Integer majorVersion;
private final Integer minorVersion;
private final Integer patchVersion;
public RedisVersion(String fullVersion) {
this.fullVersion = fullVersion;
Matcher matcher = Pattern.compile("^([\\d]+)\\.([\\d]+)\\.([\\d]+)$").matcher(fullVersion);
matcher.find();
majorVersion = Integer.parseInt(matcher.group(1));
minorVersion = Integer.parseInt(matcher.group(2));
patchVersion = Integer.parseInt(matcher.group(3));
}
public String getFullVersion() {
return fullVersion;
}
public int getMajorVersion() {
return majorVersion;
}
public int getMinorVersion() {
return minorVersion;
}
public int getPatchVersion() {
return patchVersion;
}
@Override
public int compareTo(RedisVersion o) {
int ma = this.majorVersion.compareTo(o.majorVersion);
int mi = this.minorVersion.compareTo(o.minorVersion);
int pa = this.patchVersion.compareTo(o.patchVersion);
return ma != 0 ? ma : mi != 0 ? mi : pa;
}
public int compareTo(String redisVersion) {
return this.compareTo(new RedisVersion(redisVersion));
}
public static int compareTo(String redisVersion1, String redisVersion2) {
return new RedisVersion(redisVersion1).compareTo(redisVersion2);
}
}

@ -0,0 +1,21 @@
package org.redisson;
import java.util.Locale;
/**
*
* @author Rui Gu (https://github.com/jackygurui)
*/
public class RedissonRuntimeEnvironment {
public static final boolean isTravis = "true".equalsIgnoreCase(System.getProperty("travisEnv"));
public static final String redisBinaryPath = System.getProperty("redisBinary", "C:\\redis\\redis-server.exe");
public static final String tempDir = System.getProperty("java.io.tmpdir");
public static final String OS;
public static final boolean isWindows;
static {
OS = System.getProperty("os.name", "generic");
isWindows = OS.toLowerCase(Locale.ENGLISH).contains("win");
}
}

@ -0,0 +1,16 @@
package org.redisson.spring.data.connection;
import org.junit.Before;
import org.redisson.BaseTest;
import org.springframework.data.redis.connection.RedisConnection;
public abstract class BaseConnectionTest extends BaseTest {
RedisConnection connection;
@Before
public void init() {
connection = new RedissonConnection(redisson);
}
}

@ -0,0 +1,120 @@
package org.redisson.spring.data.connection;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.redisson.BaseTest;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import java.util.Arrays;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT;
@RunWith(Parameterized.class)
public class RedissonClusterConnectionRenameTest extends BaseTest {
@Parameterized.Parameters(name= "{index} - same slot = {0}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] {
{false},
{true}
});
}
@Parameterized.Parameter(0)
public boolean sameSlot;
byte[] originalKey = "key".getBytes();
byte[] newKey = "unset".getBytes();
byte[] value = "value".getBytes();
@Test
public void testRename() {
testInCluster(connection -> {
connection.set(originalKey, value);
connection.expire(originalKey, 1000);
Integer originalSlot = connection.clusterGetSlotForKey(originalKey);
newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot), connection);
connection.rename(originalKey, newKey);
assertThat(connection.get(newKey)).isEqualTo(value);
assertThat(connection.ttl(newKey)).isGreaterThan(0);
});
}
@Test
public void testRename_pipeline() {
testInCluster(connection -> {
connection.set(originalKey, value);
Integer originalSlot = connection.clusterGetSlotForKey(originalKey);
newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot), connection);
connection.openPipeline();
assertThatThrownBy(() -> connection.rename(originalKey, newKey)).isInstanceOf(InvalidDataAccessResourceUsageException.class);
connection.closePipeline();
});
}
protected byte[] getNewKeyForSlot(byte[] originalKey, Integer targetSlot, RedissonClusterConnection connection) {
int counter = 0;
byte[] newKey = (new String(originalKey) + counter).getBytes();
Integer newKeySlot = connection.clusterGetSlotForKey(newKey);
while(!newKeySlot.equals(targetSlot)) {
counter++;
newKey = (new String(originalKey) + counter).getBytes();
newKeySlot = connection.clusterGetSlotForKey(newKey);
}
return newKey;
}
@Test
public void testRenameNX() {
testInCluster(connection -> {
connection.set(originalKey, value);
connection.expire(originalKey, 1000);
Integer originalSlot = connection.clusterGetSlotForKey(originalKey);
newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot), connection);
Boolean result = connection.renameNX(originalKey, newKey);
assertThat(connection.get(newKey)).isEqualTo(value);
assertThat(connection.ttl(newKey)).isGreaterThan(0);
assertThat(result).isTrue();
connection.set(originalKey, value);
result = connection.renameNX(originalKey, newKey);
assertThat(result).isFalse();
});
}
@Test
public void testRenameNX_pipeline() {
testInCluster(connection -> {
connection.set(originalKey, value);
Integer originalSlot = connection.clusterGetSlotForKey(originalKey);
newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot), connection);
connection.openPipeline();
assertThatThrownBy(() -> connection.renameNX(originalKey, newKey)).isInstanceOf(InvalidDataAccessResourceUsageException.class);
connection.closePipeline();
});
}
private Integer getTargetSlot(Integer originalSlot) {
return sameSlot ? originalSlot : MAX_SLOT - originalSlot - 1;
}
}

@ -0,0 +1,317 @@
package org.redisson.spring.data.connection;
import net.bytebuddy.utility.RandomString;
import org.junit.Test;
import org.redisson.BaseTest;
import org.redisson.api.RedissonClient;
import org.redisson.connection.MasterSlaveConnectionManager;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.RedisClusterNode;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisNode.NodeType;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.types.RedisClientInfo;
import java.nio.charset.StandardCharsets;
import java.util.*;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonClusterConnectionTest extends BaseTest {
@Test
public void testRandomKey() {
testInCluster(connection -> {
RedissonClient redisson = (RedissonClient) connection.getNativeConnection();
StringRedisTemplate redisTemplate = new StringRedisTemplate();
redisTemplate.setConnectionFactory(new RedissonConnectionFactory(redisson));
redisTemplate.afterPropertiesSet();
for (int i = 0; i < 10; i++) {
redisTemplate.opsForValue().set("i" + i, "i" + i);
}
for (RedisClusterNode clusterNode : redisTemplate.getConnectionFactory().getClusterConnection().clusterGetNodes()) {
String key = redisTemplate.opsForCluster().randomKey(clusterNode);
assertThat(key).isNotNull();
}
});
}
@Test
public void testDel() {
testInCluster(connection -> {
List<byte[]> keys = new ArrayList<>();
for (int i = 0; i < 10; i++) {
byte[] key = ("test" + i).getBytes();
keys.add(key);
connection.set(key, ("test" + i).getBytes());
}
assertThat(connection.del(keys.toArray(new byte[0][]))).isEqualTo(10);
});
}
@Test
public void testScan() {
testInCluster(connection -> {
Map<byte[], byte[]> map = new HashMap<>();
for (int i = 0; i < 10000; i++) {
map.put(RandomString.make(32).getBytes(), RandomString.make(32).getBytes(StandardCharsets.UTF_8));
}
connection.mSet(map);
Cursor<byte[]> b = connection.scan(ScanOptions.scanOptions().build());
Set<String> sett = new HashSet<>();
int counter = 0;
while (b.hasNext()) {
byte[] tt = b.next();
sett.add(new String(tt));
counter++;
}
assertThat(sett.size()).isEqualTo(map.size());
assertThat(counter).isEqualTo(map.size());
});
}
@Test
public void testMSet() {
testInCluster(connection -> {
Map<byte[], byte[]> map = new HashMap<>();
for (int i = 0; i < 10; i++) {
map.put(("test" + i).getBytes(), ("test" + i*100).getBytes());
}
connection.mSet(map);
for (Map.Entry<byte[], byte[]> entry : map.entrySet()) {
assertThat(connection.get(entry.getKey())).isEqualTo(entry.getValue());
}
});
}
@Test
public void testMGet() {
testInCluster(connection -> {
Map<byte[], byte[]> map = new HashMap<>();
for (int i = 0; i < 10; i++) {
map.put(("test" + i).getBytes(), ("test" + i*100).getBytes());
}
connection.mSet(map);
List<byte[]> r = connection.mGet(map.keySet().toArray(new byte[0][]));
assertThat(r).containsExactly(map.values().toArray(new byte[0][]));
});
}
@Test
public void testClusterGetNodes() {
testInCluster(connection -> {
Iterable<RedisClusterNode> nodes = connection.clusterGetNodes();
assertThat(nodes).hasSize(6);
for (RedisClusterNode redisClusterNode : nodes) {
assertThat(redisClusterNode.getLinkState()).isNotNull();
assertThat(redisClusterNode.getFlags()).isNotEmpty();
assertThat(redisClusterNode.getHost()).isNotNull();
assertThat(redisClusterNode.getPort()).isNotNull();
assertThat(redisClusterNode.getId()).isNotNull();
assertThat(redisClusterNode.getType()).isNotNull();
if (redisClusterNode.getType() == NodeType.MASTER) {
assertThat(redisClusterNode.getSlotRange().getSlots()).isNotEmpty();
} else {
assertThat(redisClusterNode.getMasterId()).isNotNull();
}
}
});
}
@Test
public void testClusterGetNodesMaster() {
testInCluster(connection -> {
Iterable<RedisClusterNode> nodes = connection.clusterGetNodes();
for (RedisClusterNode redisClusterNode : nodes) {
if (redisClusterNode.getType() == NodeType.MASTER) {
Collection<RedisClusterNode> slaves = connection.clusterGetReplicas(redisClusterNode);
assertThat(slaves).hasSize(1);
}
}
});
}
@Test
public void testClusterGetMasterSlaveMap() {
testInCluster(connection -> {
Map<RedisClusterNode, Collection<RedisClusterNode>> map = connection.clusterGetMasterReplicaMap();
assertThat(map).hasSize(3);
for (Collection<RedisClusterNode> slaves : map.values()) {
assertThat(slaves).hasSize(1);
}
});
}
@Test
public void testClusterGetSlotForKey() {
testInCluster(connection -> {
Integer slot = connection.clusterGetSlotForKey("123".getBytes());
assertThat(slot).isNotNull();
});
}
@Test
public void testClusterGetNodeForSlot() {
testInCluster(connection -> {
RedisClusterNode node1 = connection.clusterGetNodeForSlot(1);
RedisClusterNode node2 = connection.clusterGetNodeForSlot(16000);
assertThat(node1.getId()).isNotEqualTo(node2.getId());
});
}
@Test
public void testClusterGetNodeForKey() {
testInCluster(connection -> {
RedisClusterNode node = connection.clusterGetNodeForKey("123".getBytes());
assertThat(node).isNotNull();
});
}
@Test
public void testClusterGetClusterInfo() {
testInCluster(connection -> {
ClusterInfo info = connection.clusterGetClusterInfo();
assertThat(info.getSlotsFail()).isEqualTo(0);
assertThat(info.getSlotsOk()).isEqualTo(MasterSlaveConnectionManager.MAX_SLOT);
assertThat(info.getSlotsAssigned()).isEqualTo(MasterSlaveConnectionManager.MAX_SLOT);
});
}
@Test
public void testClusterAddRemoveSlots() {
testInCluster(connection -> {
RedisClusterNode master = getFirstMaster(connection);
Integer slot = master.getSlotRange().getSlots().iterator().next();
connection.clusterDeleteSlots(master, slot);
connection.clusterAddSlots(master, slot);
});
}
@Test
public void testClusterCountKeysInSlot() {
testInCluster(connection -> {
Long t = connection.clusterCountKeysInSlot(1);
assertThat(t).isZero();
});
}
@Test
public void testClusterGetKeysInSlot() {
testInCluster(connection -> {
connection.flushAll();
List<byte[]> keys = connection.clusterGetKeysInSlot(12, 10);
assertThat(keys).isEmpty();
});
}
@Test
public void testClusterPing() {
testInCluster(connection -> {
RedisClusterNode master = getFirstMaster(connection);
String res = connection.ping(master);
assertThat(res).isEqualTo("PONG");
});
}
@Test
public void testDbSize() {
testInCluster(connection -> {
connection.flushAll();
RedisClusterNode master = getFirstMaster(connection);
Long size = connection.dbSize(master);
assertThat(size).isZero();
});
}
@Test
public void testInfo() {
testInCluster(connection -> {
RedisClusterNode master = getFirstMaster(connection);
Properties info = connection.info(master);
assertThat(info.size()).isGreaterThan(10);
});
}
@Test
public void testDelPipeline() {
testInCluster(connection -> {
byte[] k = "key".getBytes();
byte[] v = "val".getBytes();
connection.set(k, v);
connection.openPipeline();
connection.get(k);
connection.del(k);
List<Object> results = connection.closePipeline();
byte[] val = (byte[])results.get(0);
assertThat(val).isEqualTo(v);
Long res = (Long) results.get(1);
assertThat(res).isEqualTo(1);
});
}
@Test
public void testResetConfigStats() {
testInCluster(connection -> {
RedisClusterNode master = getFirstMaster(connection);
connection.resetConfigStats(master);
});
}
@Test
public void testTime() {
testInCluster(connection -> {
RedisClusterNode master = getFirstMaster(connection);
Long time = connection.time(master);
assertThat(time).isGreaterThan(1000);
});
}
@Test
public void testGetClientList() {
testInCluster(connection -> {
RedisClusterNode master = getFirstMaster(connection);
List<RedisClientInfo> list = connection.getClientList(master);
assertThat(list.size()).isGreaterThan(10);
});
}
@Test
public void testSetConfig() {
testInCluster(connection -> {
RedisClusterNode master = getFirstMaster(connection);
connection.setConfig(master, "timeout", "10");
});
}
@Test
public void testGetConfig() {
testInCluster(connection -> {
RedisClusterNode master = getFirstMaster(connection);
Properties config = connection.getConfig(master, "*");
assertThat(config.size()).isGreaterThan(20);
});
}
protected RedisClusterNode getFirstMaster(RedissonClusterConnection connection) {
Map<RedisClusterNode, Collection<RedisClusterNode>> map = connection.clusterGetMasterReplicaMap();
RedisClusterNode master = map.keySet().iterator().next();
return master;
}
@Test
public void testConnectionFactoryReturnsClusterConnection() {
testInCluster(connection -> {
RedissonClient redisson = (RedissonClient) connection.getNativeConnection();
RedisConnectionFactory connectionFactory = new RedissonConnectionFactory(redisson);
assertThat(connectionFactory.getConnection()).isInstanceOf(RedissonClusterConnection.class);
});
}
}

@ -0,0 +1,287 @@
package org.redisson.spring.data.connection;
import org.junit.Test;
import org.springframework.data.domain.Range;
import org.springframework.data.geo.Circle;
import org.springframework.data.geo.GeoResults;
import org.springframework.data.geo.Point;
import org.springframework.data.redis.connection.Limit;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.RedisGeoCommands;
import org.springframework.data.redis.connection.RedisStringCommands.SetOption;
import org.springframework.data.redis.connection.RedisZSetCommands;
import org.springframework.data.redis.connection.zset.Tuple;
import org.springframework.data.redis.core.*;
import org.springframework.data.redis.core.types.Expiration;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonConnectionTest extends BaseConnectionTest {
@Test
public void testZRandMemberScore() {
StringRedisTemplate redisTemplate = new StringRedisTemplate();
redisTemplate.setConnectionFactory(new RedissonConnectionFactory(redisson));
redisTemplate.afterPropertiesSet();
redisTemplate.boundZSetOps("test").add("1", 10);
redisTemplate.boundZSetOps("test").add("2", 20);
redisTemplate.boundZSetOps("test").add("3", 30);
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
ReactiveRedisConnection cc = factory.getReactiveConnection();
Tuple b = cc.zSetCommands().zRandMemberWithScore(ByteBuffer.wrap("test".getBytes())).block();
assertThat(b.getScore()).isNotNaN();
assertThat(new String(b.getValue())).isIn("1", "2", "3");
}
@Test
public void testBZPop() {
StringRedisTemplate redisTemplate = new StringRedisTemplate();
redisTemplate.setConnectionFactory(new RedissonConnectionFactory(redisson));
redisTemplate.afterPropertiesSet();
redisTemplate.boundZSetOps("test").add("1", 10);
redisTemplate.boundZSetOps("test").add("2", 20);
redisTemplate.boundZSetOps("test").add("3", 30);
ZSetOperations.TypedTuple<String> r = redisTemplate.boundZSetOps("test").popMin(Duration.ofSeconds(1));
assertThat(r.getValue()).isEqualTo("1");
assertThat(r.getScore()).isEqualTo(10);
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
ReactiveRedisConnection cc = factory.getReactiveConnection();
Tuple r2 = cc.zSetCommands().bZPopMin(ByteBuffer.wrap("test".getBytes()), Duration.ofSeconds(1)).block();
assertThat(r2.getValue()).isEqualTo("2".getBytes());
assertThat(r2.getScore()).isEqualTo(20);
}
@Test
public void testZPop() {
StringRedisTemplate redisTemplate = new StringRedisTemplate();
redisTemplate.setConnectionFactory(new RedissonConnectionFactory(redisson));
redisTemplate.afterPropertiesSet();
redisTemplate.boundZSetOps("test").add("1", 10);
redisTemplate.boundZSetOps("test").add("2", 20);
redisTemplate.boundZSetOps("test").add("3", 30);
ZSetOperations.TypedTuple<String> r = redisTemplate.boundZSetOps("test").popMin();
assertThat(r.getValue()).isEqualTo("1");
assertThat(r.getScore()).isEqualTo(10);
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
ReactiveRedisConnection cc = factory.getReactiveConnection();
Tuple r2 = cc.zSetCommands().zPopMin(ByteBuffer.wrap("test".getBytes())).block();
assertThat(r2.getValue()).isEqualTo("2".getBytes());
assertThat(r2.getScore()).isEqualTo(20);
}
@Test
public void testZRangeWithScores() {
StringRedisTemplate redisTemplate = new StringRedisTemplate();
redisTemplate.setConnectionFactory(new RedissonConnectionFactory(redisson));
redisTemplate.afterPropertiesSet();
redisTemplate.boundZSetOps("test").add("1", 10);
redisTemplate.boundZSetOps("test").add("2", 20);
redisTemplate.boundZSetOps("test").add("3", 30);
Set<ZSetOperations.TypedTuple<String>> objs = redisTemplate.boundZSetOps("test").rangeWithScores(0, 100);
assertThat(objs).hasSize(3);
assertThat(objs).containsExactlyInAnyOrder(ZSetOperations.TypedTuple.of("1", 10D),
ZSetOperations.TypedTuple.of("2", 20D),
ZSetOperations.TypedTuple.of("3", 30D));
}
@Test
public void testZDiff() {
StringRedisTemplate redisTemplate = new StringRedisTemplate();
redisTemplate.setConnectionFactory(new RedissonConnectionFactory(redisson));
redisTemplate.afterPropertiesSet();
redisTemplate.boundZSetOps("test").add("1", 10);
redisTemplate.boundZSetOps("test").add("2", 20);
redisTemplate.boundZSetOps("test").add("3", 30);
redisTemplate.boundZSetOps("test").add("4", 30);
redisTemplate.boundZSetOps("test2").add("5", 50);
redisTemplate.boundZSetOps("test2").add("2", 20);
redisTemplate.boundZSetOps("test2").add("3", 30);
redisTemplate.boundZSetOps("test2").add("6", 60);
Set<String> objs = redisTemplate.boundZSetOps("test").difference("test2");
assertThat(objs).hasSize(2);
}
@Test
public void testZLexCount() {
StringRedisTemplate redisTemplate = new StringRedisTemplate();
redisTemplate.setConnectionFactory(new RedissonConnectionFactory(redisson));
redisTemplate.afterPropertiesSet();
redisTemplate.boundZSetOps("test").add("1", 10);
redisTemplate.boundZSetOps("test").add("2", 20);
redisTemplate.boundZSetOps("test").add("3", 30);
Long size = redisTemplate.boundZSetOps("test").lexCount(Range.closed("1", "2"));
assertThat(size).isEqualTo(2);
}
@Test
public void testZRemLexByRange() {
StringRedisTemplate redisTemplate = new StringRedisTemplate();
redisTemplate.setConnectionFactory(new RedissonConnectionFactory(redisson));
redisTemplate.afterPropertiesSet();
redisTemplate.boundZSetOps("test").add("1", 10);
redisTemplate.boundZSetOps("test").add("2", 20);
redisTemplate.boundZSetOps("test").add("3", 30);
Long size = redisTemplate.boundZSetOps("test")
.removeRangeByLex(Range.closed("1", "2"));
assertThat(size).isEqualTo(2);
}
@Test
public void testReverseRangeByLex() {
StringRedisTemplate redisTemplate = new StringRedisTemplate();
redisTemplate.setConnectionFactory(new RedissonConnectionFactory(redisson));
redisTemplate.afterPropertiesSet();
redisTemplate.boundZSetOps("test").add("1", 10);
redisTemplate.boundZSetOps("test").add("2", 20);
Set<String> ops = redisTemplate.boundZSetOps("test")
.reverseRangeByLex(Range.closed("1", "2")
, Limit.limit().count(10));
assertThat(ops.size()).isEqualTo(2);
}
@Test
public void testExecute() {
Long s = (Long) connection.execute("ttl", "key".getBytes());
assertThat(s).isEqualTo(-2);
connection.execute("flushDb");
}
@Test
public void testRandomMembers() {
RedisTemplate<String, Integer> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(new RedissonConnectionFactory(redisson));
redisTemplate.afterPropertiesSet();
SetOperations<String, Integer> ops = redisTemplate.opsForSet();
ops.add("val", 1, 2, 3, 4);
Set<Integer> values = redisTemplate.opsForSet().distinctRandomMembers("val", 1L);
assertThat(values).containsAnyOf(1, 2, 3, 4);
Integer v = redisTemplate.opsForSet().randomMember("val");
assertThat(v).isNotNull();
}
@Test
public void testRangeByLex() {
RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(new RedissonConnectionFactory(redisson));
redisTemplate.afterPropertiesSet();
RedisZSetCommands.Range range = new RedisZSetCommands.Range();
range.lt("c");
Set<String> zSetValue = redisTemplate.opsForZSet().rangeByLex("val", range);
assertThat(zSetValue).isEmpty();
}
@Test
public void testGeo() {
RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(new RedissonConnectionFactory(redisson));
redisTemplate.afterPropertiesSet();
String key = "test_geo_key";
Point point = new Point(116.401001, 40.119499);
redisTemplate.opsForGeo().add(key, point, "a");
point = new Point(111.545998, 36.133499);
redisTemplate.opsForGeo().add(key, point, "b");
point = new Point(111.483002, 36.030998);
redisTemplate.opsForGeo().add(key, point, "c");
Circle within = new Circle(116.401001, 40.119499, 80000);
RedisGeoCommands.GeoRadiusCommandArgs args = RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs().includeCoordinates();
GeoResults<RedisGeoCommands.GeoLocation<String>> res = redisTemplate.opsForGeo().radius(key, within, args);
assertThat(res.getContent().get(0).getContent().getName()).isEqualTo("a");
}
@Test
public void testZSet() {
connection.zAdd(new byte[] {1}, -1, new byte[] {1});
connection.zAdd(new byte[] {1}, 2, new byte[] {2});
connection.zAdd(new byte[] {1}, 10, new byte[] {3});
assertThat(connection.zRangeByScore(new byte[] {1}, Double.NEGATIVE_INFINITY, 5))
.containsOnly(new byte[] {1}, new byte[] {2});
}
@Test
public void testEcho() {
assertThat(connection.echo("test".getBytes())).isEqualTo("test".getBytes());
}
@Test
public void testSetGet() {
connection.set("key".getBytes(), "value".getBytes());
assertThat(connection.get("key".getBytes())).isEqualTo("value".getBytes());
}
@Test
public void testSetExpiration() {
assertThat(connection.set("key".getBytes(), "value".getBytes(), Expiration.milliseconds(111122), SetOption.SET_IF_ABSENT)).isTrue();
assertThat(connection.get("key".getBytes())).isEqualTo("value".getBytes());
}
@Test
public void testHSetGet() {
assertThat(connection.hSet("key".getBytes(), "field".getBytes(), "value".getBytes())).isTrue();
assertThat(connection.hGet("key".getBytes(), "field".getBytes())).isEqualTo("value".getBytes());
}
@Test
public void testZScan() {
connection.zAdd("key".getBytes(), 1, "value1".getBytes());
connection.zAdd("key".getBytes(), 2, "value2".getBytes());
Cursor<Tuple> t = connection.zScan("key".getBytes(), ScanOptions.scanOptions().build());
assertThat(t.hasNext()).isTrue();
assertThat(t.next().getValue()).isEqualTo("value1".getBytes());
assertThat(t.hasNext()).isTrue();
assertThat(t.next().getValue()).isEqualTo("value2".getBytes());
}
@Test
public void testRandFieldWithValues() {
connection.hSet("map".getBytes(), "key1".getBytes(), "value1".getBytes());
connection.hSet("map".getBytes(), "key2".getBytes(), "value2".getBytes());
connection.hSet("map".getBytes(), "key3".getBytes(), "value3".getBytes());
List<Map.Entry<byte[], byte[]>> s = connection.hRandFieldWithValues("map".getBytes(), 2);
assertThat(s).hasSize(2);
Map.Entry<byte[], byte[]> s2 = connection.hRandFieldWithValues("map".getBytes());
assertThat(s2).isNotNull();
byte[] f = connection.hRandField("map".getBytes());
assertThat((Object) f).isIn("key1".getBytes(), "key2".getBytes(), "key3".getBytes());
}
}

@ -0,0 +1,96 @@
package org.redisson.spring.data.connection;
import static org.assertj.core.api.Assertions.*;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Test;
import org.redisson.BaseTest;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
public class RedissonMultiConnectionTest extends BaseConnectionTest {
@Test
public void testBroken() throws InterruptedException {
StringRedisTemplate stringRedisTemplate = new StringRedisTemplate(new RedissonConnectionFactory(redisson));
ExecutorService e = Executors.newFixedThreadPool(32);
AtomicBoolean hasErrors = new AtomicBoolean();
for (int i = 0; i < 10; i++) {
e.submit(() -> {
stringRedisTemplate.execute(new SessionCallback<Void>() {
@Override
public Void execute(RedisOperations operations) throws DataAccessException {
try {
ValueOperations<String, String> valueOps = operations.opsForValue();
operations.multi();
valueOps.set("test3", "value");
} catch (Exception e) {
e.printStackTrace();
hasErrors.set(true);
}
return null;
}
});
stringRedisTemplate.execute(new SessionCallback<Void>() {
@Override
public Void execute(RedisOperations operations) throws DataAccessException {
try {
ValueOperations<String, String> valueOps = operations.opsForValue();
valueOps.set("test1", "value");
assertThat(valueOps.get("test1")).isEqualTo("value");
} catch (Exception e) {
e.printStackTrace();
hasErrors.set(true);
}
return null;
}
});
});
}
e.shutdown();
e.awaitTermination(1, TimeUnit.MINUTES);
assertThat(hasErrors).isFalse();
}
@Test
public void testEcho() {
RedissonConnection connection = new RedissonConnection(redisson);
connection.multi();
assertThat(connection.echo("test".getBytes())).isNull();
assertThat(connection.exec().iterator().next()).isEqualTo("test".getBytes());
}
@Test
public void testSetGet() {
RedissonConnection connection = new RedissonConnection(redisson);
connection.multi();
assertThat(connection.isQueueing()).isTrue();
connection.set("key".getBytes(), "value".getBytes());
assertThat(connection.get("key".getBytes())).isNull();
List<Object> result = connection.exec();
assertThat(connection.isQueueing()).isFalse();
assertThat(result.get(0)).isEqualTo("value".getBytes());
}
@Test
public void testHSetGet() {
RedissonConnection connection = new RedissonConnection(redisson);
connection.multi();
assertThat(connection.hSet("key".getBytes(), "field".getBytes(), "value".getBytes())).isNull();
assertThat(connection.hGet("key".getBytes(), "field".getBytes())).isNull();
List<Object> result = connection.exec();
assertThat((Boolean)result.get(0)).isTrue();
assertThat(result.get(1)).isEqualTo("value".getBytes());
}
}

@ -0,0 +1,64 @@
package org.redisson.spring.data.connection;
import static org.assertj.core.api.Assertions.*;
import java.util.List;
import org.junit.Test;
import org.redisson.BaseTest;
public class RedissonPipelineConnectionTest extends BaseConnectionTest {
@Test
public void testDel() {
RedissonConnection connection = new RedissonConnection(redisson);
byte[] key = "my_key".getBytes();
byte[] value = "my_value".getBytes();
connection.set(key, value);
connection.openPipeline();
connection.get(key);
connection.del(key);
List<Object> results = connection.closePipeline();
byte[] val = (byte[])results.get(0);
assertThat(val).isEqualTo(value);
Long res = (Long) results.get(1);
assertThat(res).isEqualTo(1);
}
@Test
public void testEcho() {
RedissonConnection connection = new RedissonConnection(redisson);
connection.openPipeline();
assertThat(connection.echo("test".getBytes())).isNull();
assertThat(connection.closePipeline().iterator().next()).isEqualTo("test".getBytes());
}
@Test
public void testSetGet() {
RedissonConnection connection = new RedissonConnection(redisson);
connection.openPipeline();
assertThat(connection.isPipelined()).isTrue();
connection.set("key".getBytes(), "value".getBytes());
assertThat(connection.get("key".getBytes())).isNull();
List<Object> result = connection.closePipeline();
assertThat(connection.isPipelined()).isFalse();
assertThat(result.get(0)).isEqualTo("value".getBytes());
}
@Test
public void testHSetGet() {
RedissonConnection connection = new RedissonConnection(redisson);
connection.openPipeline();
assertThat(connection.hSet("key".getBytes(), "field".getBytes(), "value".getBytes())).isNull();
assertThat(connection.hGet("key".getBytes(), "field".getBytes())).isNull();
List<Object> result = connection.closePipeline();
assertThat((Boolean)result.get(0)).isTrue();
assertThat(result.get(1)).isEqualTo("value".getBytes());
}
}

@ -0,0 +1,166 @@
package org.redisson.spring.data.connection;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.redisson.*;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.config.Config;
import org.redisson.config.SubscriptionMode;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.redisson.reactive.CommandReactiveService;
import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.connection.ReactiveRedisClusterConnection;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
import java.util.function.Consumer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT;
@RunWith(Parameterized.class)
public class RedissonReactiveClusterKeyCommandsTest extends BaseTest {
@Parameterized.Parameters(name= "{index} - same slot = {0}; has ttl = {1}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] {
{false, false},
{true, false},
{false, true},
{true, true}
});
}
@Parameterized.Parameter(0)
public boolean sameSlot;
@Parameterized.Parameter(1)
public boolean hasTtl;
ByteBuffer originalKey = ByteBuffer.wrap("key".getBytes());
ByteBuffer newKey = ByteBuffer.wrap("unset".getBytes());
ByteBuffer value = ByteBuffer.wrap("value".getBytes());
private void testInClusterReactive(Consumer<ReactiveRedisClusterConnection> redissonCallback) {
testInCluster(c -> {
RedissonClient redisson = (RedissonClient) c.getNativeConnection();
RedissonReactiveRedisClusterConnection connection = new RedissonReactiveRedisClusterConnection(((RedissonReactive) redisson.reactive()).getCommandExecutor());
redissonCallback.accept(connection);
});
}
@Test
public void testRename() {
testInClusterReactive(connection -> {
connection.stringCommands().set(originalKey, value).block();
if (hasTtl) {
connection.keyCommands().expire(originalKey, Duration.ofSeconds(1000)).block();
}
Integer originalSlot = getSlotForKey(originalKey, (RedissonReactiveRedisClusterConnection) connection);
newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot), connection);
Boolean response = connection.keyCommands().rename(originalKey, newKey).block();
assertThat(response).isTrue();
final ByteBuffer newKeyValue = connection.stringCommands().get(newKey).block();
assertThat(newKeyValue).isEqualTo(value);
if (hasTtl) {
assertThat(connection.keyCommands().ttl(newKey).block()).isGreaterThan(0);
} else {
assertThat(connection.keyCommands().ttl(newKey).block()).isEqualTo(-1);
}
});
}
@Test
public void testRename_keyNotExist() {
testInClusterReactive(connection -> {
Integer originalSlot = getSlotForKey(originalKey, (RedissonReactiveRedisClusterConnection) connection);
newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot), connection);
if (sameSlot) {
// This is a quirk of the implementation - since same-slot renames use the non-cluster version,
// the result is a Redis error. This behavior matches other spring-data-redis implementations
assertThatThrownBy(() -> connection.keyCommands().rename(originalKey, newKey).block())
.isInstanceOf(RedisSystemException.class);
} else {
Boolean response = connection.keyCommands().rename(originalKey, newKey).block();
assertThat(response).isTrue();
final ByteBuffer newKeyValue = connection.stringCommands().get(newKey).block();
assertThat(newKeyValue).isEqualTo(null);
}
});
}
protected ByteBuffer getNewKeyForSlot(String originalKey, Integer targetSlot, ReactiveRedisClusterConnection connection) {
int counter = 0;
ByteBuffer newKey = ByteBuffer.wrap((originalKey + counter).getBytes());
Integer newKeySlot = getSlotForKey(newKey, (RedissonReactiveRedisClusterConnection) connection);
while(!newKeySlot.equals(targetSlot)) {
counter++;
newKey = ByteBuffer.wrap((originalKey + counter).getBytes());
newKeySlot = getSlotForKey(newKey, (RedissonReactiveRedisClusterConnection) connection);
}
return newKey;
}
@Test
public void testRenameNX() {
testInClusterReactive(connection -> {
connection.stringCommands().set(originalKey, value).block();
if (hasTtl) {
connection.keyCommands().expire(originalKey, Duration.ofSeconds(1000)).block();
}
Integer originalSlot = getSlotForKey(originalKey, (RedissonReactiveRedisClusterConnection) connection);
newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot), connection);
Boolean result = connection.keyCommands().renameNX(originalKey, newKey).block();
assertThat(result).isTrue();
assertThat(connection.stringCommands().get(newKey).block()).isEqualTo(value);
if (hasTtl) {
assertThat(connection.keyCommands().ttl(newKey).block()).isGreaterThan(0);
} else {
assertThat(connection.keyCommands().ttl(newKey).block()).isEqualTo(-1);
}
connection.stringCommands().set(originalKey, value).block();
result = connection.keyCommands().renameNX(originalKey, newKey).block();
assertThat(result).isFalse();
});
}
private Integer getTargetSlot(Integer originalSlot) {
return sameSlot ? originalSlot : MAX_SLOT - originalSlot - 1;
}
private Integer getSlotForKey(ByteBuffer key, RedissonReactiveRedisClusterConnection connection) {
return (Integer) connection.read(null, StringCodec.INSTANCE, RedisCommands.KEYSLOT, key.array()).block();
}
}

@ -0,0 +1,21 @@
package org.redisson.spring.data.connection;
import org.junit.Test;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import java.time.Duration;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonReactiveKeyCommandsTest extends BaseConnectionTest {
@Test
public void testExpiration() {
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
ReactiveStringRedisTemplate t = new ReactiveStringRedisTemplate(factory);
t.opsForValue().set("123", "4343").block();
t.expire("123", Duration.ofMillis(1001)).block();
assertThat(t.getExpire("123").block().toMillis()).isBetween(900L, 1000L);
}
}

@ -0,0 +1,33 @@
package org.redisson.spring.data.connection;
import org.junit.Test;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReturnType;
import reactor.core.publisher.Flux;
import java.nio.ByteBuffer;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonScriptReactiveTest extends BaseConnectionTest {
@Test
public void testEval() {
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
ReactiveRedisConnection cc = factory.getReactiveConnection();
String s = "local ret = {}" +
"local mysqlKeys = {}" +
"table.insert(ret, 'test1')" +
"table.insert(ret, 'test2')" +
"table.insert(ret, 'test3')" +
"table.insert(ret, mysqlKeys)" +
"return ret";
Flux<List<Object>> ss = cc.scriptingCommands().eval(ByteBuffer.wrap(s.getBytes()), ReturnType.MULTI, 0);
List<Object> r = ss.blockFirst();
assertThat(r.get(2)).isEqualTo(ByteBuffer.wrap("test3".getBytes()));
assertThat((List) r.get(3)).isEmpty();
}
}

@ -0,0 +1,132 @@
package org.redisson.spring.data.connection;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.util.Collection;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.redisson.RedisRunner;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.springframework.data.redis.connection.RedisSentinelConnection;
import org.springframework.data.redis.connection.RedisServer;
public class RedissonSentinelConnectionTest {
RedissonClient redisson;
RedisSentinelConnection connection;
RedisRunner.RedisProcess master;
RedisRunner.RedisProcess slave1;
RedisRunner.RedisProcess slave2;
RedisRunner.RedisProcess sentinel1;
RedisRunner.RedisProcess sentinel2;
RedisRunner.RedisProcess sentinel3;
@Before
public void before() throws FailedToStartRedisException, IOException, InterruptedException {
master = new RedisRunner()
.nosave()
.randomDir()
.run();
slave1 = new RedisRunner()
.port(6380)
.nosave()
.randomDir()
.slaveof("127.0.0.1", 6379)
.run();
slave2 = new RedisRunner()
.port(6381)
.nosave()
.randomDir()
.slaveof("127.0.0.1", 6379)
.run();
sentinel1 = new RedisRunner()
.nosave()
.randomDir()
.port(26379)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
sentinel2 = new RedisRunner()
.nosave()
.randomDir()
.port(26380)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
sentinel3 = new RedisRunner()
.nosave()
.randomDir()
.port(26381)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
Thread.sleep(5000);
Config config = new Config();
config.useSentinelServers()
.setLoadBalancer(new RandomLoadBalancer())
.addSentinelAddress(sentinel3.getRedisServerAddressAndPort()).setMasterName("myMaster");
redisson = Redisson.create(config);
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
connection = factory.getSentinelConnection();
}
@After
public void after() {
sentinel1.stop();
sentinel2.stop();
sentinel3.stop();
master.stop();
slave1.stop();
slave2.stop();
redisson.shutdown();
}
@Test
public void testMasters() {
Collection<RedisServer> masters = connection.masters();
assertThat(masters).hasSize(1);
}
@Test
public void testSlaves() {
Collection<RedisServer> masters = connection.masters();
Collection<RedisServer> slaves = connection.replicas(masters.iterator().next());
assertThat(slaves).hasSize(2);
}
@Test
public void testRemove() {
Collection<RedisServer> masters = connection.masters();
connection.remove(masters.iterator().next());
}
@Test
public void testMonitor() {
Collection<RedisServer> masters = connection.masters();
RedisServer master = masters.iterator().next();
master.setName(master.getName() + ":");
connection.monitor(master);
}
@Test
public void testFailover() throws InterruptedException {
Collection<RedisServer> masters = connection.masters();
connection.failover(masters.iterator().next());
Thread.sleep(10000);
RedisServer newMaster = connection.masters().iterator().next();
assertThat(masters.iterator().next().getPort()).isNotEqualTo(newMaster.getPort());
}
}

@ -0,0 +1,191 @@
package org.redisson.spring.data.connection;
import mockit.Invocation;
import mockit.Mock;
import mockit.MockUp;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.client.protocol.CommandData;
import org.redisson.connection.ClientConnectionsEntry;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Nikita Koksharov
*/
public class RedissonStreamTest extends BaseConnectionTest {
@Test
public void testReattachment() throws InterruptedException {
withSentinel((nodes, config) -> {
RedissonClient redissonClient = Redisson.create(config);
RedisConnectionFactory redisConnectionFactory = new RedissonConnectionFactory(redissonClient);
StreamMessageListenerContainer listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, getOptions());
Consumer consumer = Consumer.from("group", "consumer1");
StreamOffset<String> streamOffset = StreamOffset.create("test", ReadOffset.from(">"));
String channel = "test";
AtomicInteger counter = new AtomicInteger();
Subscription subscription = listenerContainer.register(getReadRequest(consumer, streamOffset),
listener(redisConnectionFactory, channel, consumer, counter));
StringRedisTemplate t1 = new StringRedisTemplate(redisConnectionFactory);
t1.opsForStream().createGroup("test", "group");
listenerContainer.start();
AtomicReference<Boolean> invoked = new AtomicReference<>();
new MockUp<ClientConnectionsEntry>() {
@Mock
void reattachBlockingQueue(Invocation inv, CommandData<?, ?> commandData) {
try {
inv.proceed(commandData);
invoked.compareAndSet(null, true);
} catch (Exception e) {
e.printStackTrace();
invoked.set(false);
throw e;
}
}
};
for (int i = 0; i < 3; i++) {
StringRedisTemplate stringRedisTemplate = new StringRedisTemplate(redisConnectionFactory);
ObjectRecord<String, String> record = StreamRecords.newRecord()
.ofObject("message")
.withStreamKey(channel);
RecordId recordId = stringRedisTemplate.opsForStream().add(record);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
nodes.get(0).stop();
try {
Thread.sleep(15000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
assertThat(invoked.get()).isTrue();
Assertions.assertThat(counter.get()).isEqualTo(3);
listenerContainer.stop();
redissonClient.shutdown();
}, 2);
}
private StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> getOptions() {
return StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1))
.targetType(String.class)
.build();
}
private StreamMessageListenerContainer.StreamReadRequest<String> getReadRequest(Consumer consumer, StreamOffset<String> streamOffset) {
return StreamMessageListenerContainer.StreamReadRequest
.builder(streamOffset)
.consumer(consumer)
.autoAcknowledge(false)
.cancelOnError((err) -> false) // do not stop consuming after error
.build();
}
private <T> StreamListener listener(RedisConnectionFactory redisConnectionFactory, String channel, Consumer consumer, AtomicInteger counter) {
return message -> {
try {
System.out.println("Acknowledging message: " + message.getId());
StringRedisTemplate stringRedisTemplate = new StringRedisTemplate(redisConnectionFactory);
stringRedisTemplate.opsForStream().acknowledge(channel, consumer.getGroup(), message.getId());
System.out.println("RECEIVED " + consumer + " " + message);
counter.incrementAndGet();
} catch(Exception e) {
e.printStackTrace();
}
};
}
@Test
public void testPending() {
connection.streamCommands().xGroupCreate("test".getBytes(), "testGroup", ReadOffset.latest(), true);
PendingMessages p = connection.streamCommands().xPending("test".getBytes(), Consumer.from("testGroup", "test1"));
assertThat(p.size()).isEqualTo(0);
connection.streamCommands().xAdd("test".getBytes(), Collections.singletonMap("1".getBytes(), "1".getBytes()));
connection.streamCommands().xAdd("test".getBytes(), Collections.singletonMap("2".getBytes(), "2".getBytes()));
connection.streamCommands().xAdd("test".getBytes(), Collections.singletonMap("3".getBytes(), "3".getBytes()));
List<ByteRecord> l = connection.streamCommands().xReadGroup(Consumer.from("testGroup", "test1"), StreamOffset.create("test".getBytes(), ReadOffset.from(">")));
assertThat(l.size()).isEqualTo(3);
PendingMessages p2 = connection.streamCommands().xPending("test".getBytes(), Consumer.from("testGroup", "test1"));
assertThat(p2.size()).isEqualTo(3);
}
@Test
public void testGroups() {
connection.streamCommands().xGroupCreate("test".getBytes(), "testGroup", ReadOffset.latest(), true);
connection.streamCommands().xAdd("test".getBytes(), Collections.singletonMap("1".getBytes(), "1".getBytes()));
connection.streamCommands().xAdd("test".getBytes(), Collections.singletonMap("2".getBytes(), "2".getBytes()));
connection.streamCommands().xAdd("test".getBytes(), Collections.singletonMap("3".getBytes(), "3".getBytes()));
StreamInfo.XInfoGroups groups = connection.streamCommands().xInfoGroups("test".getBytes());
assertThat(groups.size()).isEqualTo(1);
assertThat(groups.get(0).groupName()).isEqualTo("testGroup");
assertThat(groups.get(0).pendingCount()).isEqualTo(0);
assertThat(groups.get(0).consumerCount()).isEqualTo(0);
assertThat(groups.get(0).lastDeliveredId()).isEqualTo("0-0");
}
@Test
public void testConsumers() {
connection.streamCommands().xGroupCreate("test".getBytes(), "testGroup", ReadOffset.latest(), true);
connection.streamCommands().xAdd("test".getBytes(), Collections.singletonMap("1".getBytes(), "1".getBytes()));
connection.streamCommands().xAdd("test".getBytes(), Collections.singletonMap("2".getBytes(), "2".getBytes()));
connection.streamCommands().xAdd("test".getBytes(), Collections.singletonMap("3".getBytes(), "3".getBytes()));
connection.streamCommands().xGroupCreate("test".getBytes(), "testGroup2", ReadOffset.latest(), true);
connection.streamCommands().xAdd("test".getBytes(), Collections.singletonMap("1".getBytes(), "1".getBytes()));
connection.streamCommands().xAdd("test".getBytes(), Collections.singletonMap("2".getBytes(), "2".getBytes()));
connection.streamCommands().xAdd("test".getBytes(), Collections.singletonMap("3".getBytes(), "3".getBytes()));
List<ByteRecord> list = connection.streamCommands().xReadGroup(Consumer.from("testGroup", "consumer1"),
StreamOffset.create("test".getBytes(), ReadOffset.lastConsumed()));
assertThat(list.size()).isEqualTo(6);
StreamInfo.XInfoStream info = connection.streamCommands().xInfo("test".getBytes());
assertThat(info.streamLength()).isEqualTo(6);
StreamInfo.XInfoConsumers s1 = connection.streamCommands().xInfoConsumers("test".getBytes(), "testGroup");
assertThat(s1.getConsumerCount()).isEqualTo(1);
assertThat(s1.get(0).consumerName()).isEqualTo("consumer1");
assertThat(s1.get(0).pendingCount()).isEqualTo(6);
assertThat(s1.get(0).idleTimeMs()).isLessThan(100L);
assertThat(s1.get(0).groupName()).isEqualTo("testGroup");
}
}

@ -0,0 +1,131 @@
package org.redisson.spring.data.connection;
import org.awaitility.Awaitility;
import org.awaitility.Durations;
import org.junit.Test;
import org.springframework.data.redis.connection.BitFieldSubCommands;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveSubscription;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import reactor.core.publisher.Mono;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonSubscribeReactiveTest extends BaseConnectionTest {
@Test
public void testPubSub() {
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
AtomicLong counter = new AtomicLong();
ReactiveStringRedisTemplate template = new ReactiveStringRedisTemplate(factory);
template.listenTo(ChannelTopic.of("test")).flatMap(message -> {
counter.incrementAndGet();
return Mono.empty();
}).subscribe();
for (int i = 0; i < 40; i++) {
ReactiveRedisConnection connection = factory.getReactiveConnection();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
}
Awaitility.await().atMost(Durations.ONE_SECOND).untilAsserted(() -> {
assertThat(counter.get()).isEqualTo(40);
});
}
@Test
public void testTemplate() {
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
AtomicLong counter = new AtomicLong();
ReactiveStringRedisTemplate template = new ReactiveStringRedisTemplate(factory);
template.listenTo(ChannelTopic.of("test")).flatMap(message -> {
counter.incrementAndGet();
return Mono.empty();
}).subscribe();
template.listenTo(ChannelTopic.of("test2")).flatMap(message -> {
counter.incrementAndGet();
return Mono.empty();
}).subscribe();
ReactiveRedisConnection connection = factory.getReactiveConnection();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> counter.get() == 1);
BitFieldSubCommands commands = BitFieldSubCommands.create()
.get(BitFieldSubCommands.BitFieldType.UINT_32).valueAt(0)
.get(BitFieldSubCommands.BitFieldType.UINT_32).valueAt(1)
.get(BitFieldSubCommands.BitFieldType.UINT_32).valueAt(2);
for (int i = 0; i < 128; i++) {
template.opsForValue().setBit("key", i, true);
}
AtomicReference<List<Long>> result = new AtomicReference<>();
template.opsForValue().bitField("key", commands).doOnNext(r -> {
result.set(r);
}).subscribe();
Awaitility.waitAtMost(Durations.FIVE_HUNDRED_MILLISECONDS).untilAsserted(() -> {
assertThat(result.get()).isEqualTo(Arrays.asList(0L, 0L, 0L));
});
}
@Test
public void testSubscribe() {
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
ReactiveRedisConnection connection = factory.getReactiveConnection();
Mono<ReactiveSubscription> s = connection.pubSubCommands().createSubscription();
AtomicReference<byte[]> msg = new AtomicReference<byte[]>();
ReactiveSubscription ss = s.block();
ss.subscribe(ByteBuffer.wrap("test".getBytes())).block();
ss.receive().doOnEach(message -> {
msg.set(message.get().getMessage().array());
}).subscribe();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
ss.unsubscribe();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
}
@Test
public void testUnSubscribe() {
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
ReactiveRedisConnection connection = factory.getReactiveConnection();
Mono<ReactiveSubscription> s = connection.pubSubCommands().createSubscription();
AtomicReference<byte[]> msg = new AtomicReference<byte[]>();
ReactiveSubscription ss = s.block();
ss.subscribe(ByteBuffer.wrap("test".getBytes())).block();
ss.receive().doOnEach(message -> {
msg.set(message.get().getMessage().array());
}).subscribe();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
ss.unsubscribe();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
}
}

@ -0,0 +1,290 @@
package org.redisson.spring.data.connection;
import org.awaitility.Awaitility;
import org.awaitility.Durations;
import org.junit.Test;
import org.redisson.ClusterRunner;
import org.redisson.RedisRunner;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import java.io.IOException;
import java.util.Arrays;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonSubscribeTest extends BaseConnectionTest {
@Test
public void testContainer() {
RedissonConnectionFactory f = new RedissonConnectionFactory(redisson);
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(f);
container.afterPropertiesSet();
container.start();
// for (int i = 0; i < 2; i++) {
// container.addMessageListener(new MessageListener() {
// @Override
// public void onMessage(Message message, byte[] pattern) {
// }
// }, ChannelTopic.of("test"));
// }
//
// container.stop();
//
// container = new RedisMessageListenerContainer();
// container.setConnectionFactory(f);
// container.afterPropertiesSet();
// container.start();
// for (int i = 0; i < 2; i++) {
// container.addMessageListener(new MessageListener() {
// @Override
// public void onMessage(Message message, byte[] pattern) {
// }
// }, PatternTopic.of("*"));
// }
// container.stop();
//
// container= new RedisMessageListenerContainer();
// container.setConnectionFactory(f);
// container.afterPropertiesSet();
// container.start();
for (int i = 0; i < 2; i++) {
container.addMessageListener(new MessageListener() {
@Override
public void onMessage(Message message, byte[] pattern) {
}
}, ChannelTopic.of("test"+i));
}
container.stop();
container= new RedisMessageListenerContainer();
container.setConnectionFactory(f);
container.afterPropertiesSet();
container.start();
for (int i = 0; i < 2; i++) {
container.addMessageListener(new MessageListener() {
@Override
public void onMessage(Message message, byte[] pattern) {
}
}, PatternTopic.of("*" + i));
}
container.stop();
}
@Test
public void testCluster() throws IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave() .notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.K,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.x,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.$)
;
RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave() .notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.K,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.x,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.$)
;
RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave() .notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.K,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.x,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.$)
;
RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave() .notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.K,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.x,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.$)
;
RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave() .notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.K,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.x,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.$)
;
RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave() .notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.K,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.x,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.$)
;
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterRunner.ClusterProcesses process = clusterRunner.run();
Thread.sleep(5000);
Config config = new Config();
config.useClusterServers()
.setPingConnectionInterval(0)
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
Queue<String> names = new ConcurrentLinkedQueue<>();
container.addMessageListener(new MessageListener() {
@Override
public void onMessage(Message message, byte[] pattern) {
names.add(new String(message.getBody()));
}
}, new PatternTopic("__keyevent@0__:expired"));
container.afterPropertiesSet();
container.start();
factory.getConnection().setEx("EG:test:key1".getBytes(), 3, "123".getBytes());
factory.getConnection().setEx("test:key2".getBytes(), 3, "123".getBytes());
factory.getConnection().setEx("test:key1".getBytes(), 3, "123".getBytes());
Awaitility.await().atMost(Durations.FIVE_SECONDS).untilAsserted(() -> {
assertThat(names).containsExactlyInAnyOrder("EG:test:key1", "test:key2", "test:key1");
});
redisson.shutdown();
process.shutdown();
}
@Test
public void testListenersDuplication() {
Queue<byte[]> msg = new ConcurrentLinkedQueue<>();
MessageListener aListener = (message, pattern) -> {
msg.add(message.getBody());
};
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
container.addMessageListener(aListener,
Arrays.asList(new ChannelTopic("a"), new ChannelTopic("b")));
container.addMessageListener(aListener,
Arrays.asList(new PatternTopic("c*")));
container.afterPropertiesSet();
container.start();
RedisConnection c = factory.getConnection();
c.publish("a".getBytes(), "msg".getBytes());
Awaitility.await().atMost(Durations.ONE_SECOND)
.untilAsserted(() -> {
assertThat(msg).containsExactly("msg".getBytes());
});
}
@Test
public void testPatterTopic() throws IOException, InterruptedException {
RedisRunner.RedisProcess instance = new RedisRunner()
.nosave()
.randomPort()
.randomDir()
.notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.K,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.$)
.run();
Config config = new Config();
config.useSingleServer().setAddress(instance.getRedisServerAddressAndPort()).setPingConnectionInterval(0);
RedissonClient redisson = Redisson.create(config);
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
AtomicInteger counterTest = new AtomicInteger();
container.addMessageListener(new MessageListener() {
@Override
public void onMessage(Message message, byte[] pattern) {
counterTest.incrementAndGet();
}
}, new PatternTopic("__keyspace@0__:mykey"));
container.addMessageListener(new MessageListener() {
@Override
public void onMessage(Message message, byte[] pattern) {
counterTest.incrementAndGet();
}
}, new PatternTopic("__keyevent@0__:del"));
container.afterPropertiesSet();
container.start();
assertThat(container.isRunning()).isTrue();
RedisConnection c = factory.getConnection();
c.set("mykey".getBytes(), "2".getBytes());
c.del("mykey".getBytes());
Awaitility.await().atMost(Durations.FIVE_SECONDS).until(() -> {
return counterTest.get() == 3;
});
container.stop();
redisson.shutdown();
}
@Test
public void testSubscribe() {
RedissonConnection connection = new RedissonConnection(redisson);
AtomicReference<byte[]> msg = new AtomicReference<byte[]>();
connection.subscribe(new MessageListener() {
@Override
public void onMessage(Message message, byte[] pattern) {
msg.set(message.getBody());
}
}, "test".getBytes());
connection.publish("test".getBytes(), "msg".getBytes());
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
connection.getSubscription().unsubscribe();
connection.publish("test".getBytes(), "msg".getBytes());
}
@Test
public void testUnSubscribe() {
RedissonConnection connection = new RedissonConnection(redisson);
AtomicReference<byte[]> msg = new AtomicReference<byte[]>();
connection.subscribe(new MessageListener() {
@Override
public void onMessage(Message message, byte[] pattern) {
msg.set(message.getBody());
}
}, "test".getBytes());
connection.publish("test".getBytes(), "msg".getBytes());
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
connection.getSubscription().unsubscribe();
}
}

@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2012 Nikita Koksharov
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy.MM.dd HH:mm:ss.SSS} %-5level %c{0} : %msg%n</pattern>
</encoder>
</appender>
<logger name="org.redisson" additivity="true">
<level value="info"/>
</logger>
<root>
<level value="info"/>
<appender-ref ref="CONSOLE"/>
</root>
</configuration>
Loading…
Cancel
Save