From ee5f9a98fbc2c77e3de5ba800b026454a6fe3ac0 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 30 May 2024 11:31:57 +0300 Subject: [PATCH] Feature - Spring Data Redis 3.3.0 integration --- .../redisson-spring-data-33/pom.xml | 100 ++ .../src/test/java/org/redisson/BaseTest.java | 282 +++++ .../test/java/org/redisson/ClusterRunner.java | 156 +++ .../test/java/org/redisson/RedisRunner.java | 1070 +++++++++++++++++ .../test/java/org/redisson/RedisVersion.java | 58 + .../redisson/RedissonRuntimeEnvironment.java | 21 + .../data/connection/BaseConnectionTest.java | 16 + .../RedissonClusterConnectionRenameTest.java | 120 ++ .../RedissonClusterConnectionTest.java | 317 +++++ .../connection/RedissonConnectionTest.java | 287 +++++ .../RedissonMultiConnectionTest.java | 96 ++ .../RedissonPipelineConnectionTest.java | 64 + ...edissonReactiveClusterKeyCommandsTest.java | 166 +++ .../RedissonReactiveKeyCommandsTest.java | 21 + .../RedissonScriptReactiveTest.java | 33 + .../RedissonSentinelConnectionTest.java | 132 ++ .../data/connection/RedissonStreamTest.java | 191 +++ .../RedissonSubscribeReactiveTest.java | 131 ++ .../connection/RedissonSubscribeTest.java | 290 +++++ .../src/test/resources/logback.xml | 36 + 20 files changed, 3587 insertions(+) create mode 100644 redisson-spring-data/redisson-spring-data-33/pom.xml create mode 100644 redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/BaseTest.java create mode 100644 redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/ClusterRunner.java create mode 100644 redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/RedisRunner.java create mode 100644 redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/RedisVersion.java create mode 100644 redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/RedissonRuntimeEnvironment.java create mode 100644 redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/BaseConnectionTest.java create mode 100644 redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionRenameTest.java create mode 100644 redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java create mode 100644 redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonConnectionTest.java create mode 100644 redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonMultiConnectionTest.java create mode 100644 redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonPipelineConnectionTest.java create mode 100644 redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonReactiveClusterKeyCommandsTest.java create mode 100644 redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommandsTest.java create mode 100644 redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonScriptReactiveTest.java create mode 100644 redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonSentinelConnectionTest.java create mode 100644 redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonStreamTest.java create mode 100644 redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeReactiveTest.java create mode 100644 redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeTest.java create mode 100644 redisson-spring-data/redisson-spring-data-33/src/test/resources/logback.xml diff --git a/redisson-spring-data/redisson-spring-data-33/pom.xml b/redisson-spring-data/redisson-spring-data-33/pom.xml new file mode 100644 index 000000000..c864c2569 --- /dev/null +++ b/redisson-spring-data/redisson-spring-data-33/pom.xml @@ -0,0 +1,100 @@ + + 4.0.0 + + + org.redisson + redisson-spring-data + 3.30.1-SNAPSHOT + ../ + + + redisson-spring-data-33 + jar + + Redisson/Spring Data Redis v3.3.x integration + + + + org.springframework.data + spring-data-redis + 3.3.0 + + + + ch.qos.logback + logback-classic + 1.4.12 + test + + + + com.github.hazendaz.jmockit + jmockit + test + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + true + + 4 + true + ${argLine} -javaagent:"${settings.localRepository}"/com/github/hazendaz/jmockit/jmockit/1.52.0/jmockit-1.52.0.jar + + + + + maven-jar-plugin + 3.4.1 + + + + ${maven.build.timestamp} + redisson.spring.data27 + + + + + + + com.mycila + license-maven-plugin + 4.3 + + ${basedir} +
${basedir}/../../header.txt
+ false + true + false + + src/main/java/org/redisson/ + + + target/** + + true + + JAVADOC_STYLE + + true + true + UTF-8 +
+ + + + check + + + +
+ +
+
+ +
diff --git a/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/BaseTest.java b/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/BaseTest.java new file mode 100644 index 000000000..69e6077a8 --- /dev/null +++ b/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/BaseTest.java @@ -0,0 +1,282 @@ +package org.redisson; + +import com.github.dockerjava.api.model.ContainerNetwork; +import com.github.dockerjava.api.model.ExposedPort; +import com.github.dockerjava.api.model.PortBinding; +import com.github.dockerjava.api.model.Ports; +import org.junit.Before; +import org.junit.BeforeClass; +import org.redisson.api.NatMapper; +import org.redisson.api.RedissonClient; +import org.redisson.config.Config; +import org.redisson.config.Protocol; +import org.redisson.misc.RedisURI; +import org.redisson.spring.data.connection.RedissonClusterConnection; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +public abstract class BaseTest { + + protected static final String NOTIFY_KEYSPACE_EVENTS = "--notify-keyspace-events"; + + protected static final GenericContainer REDIS = createRedis(); + + protected static final Protocol protocol = Protocol.RESP2; + + protected static RedissonClient redisson; + + protected static RedissonClient redissonCluster; + + private static GenericContainer REDIS_CLUSTER; + + protected static GenericContainer createRedis() { + return createRedis("7.2"); + } + + protected static GenericContainer createRedis(String version) { + return new GenericContainer<>("redis:" + version) + .withCreateContainerCmdModifier(cmd -> { + cmd.withCmd("redis-server", "--save", "''"); + }) + .withExposedPorts(6379); + } + + @BeforeClass + public static void beforeAll() { + if (redisson == null) { + REDIS.start(); + Config config = createConfig(); + redisson = Redisson.create(config); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + redisson.shutdown(); + REDIS.stop(); + if (redissonCluster != null) { + redissonCluster.shutdown(); + redissonCluster = null; + } + if (REDIS_CLUSTER != null) { + REDIS_CLUSTER.stop(); + REDIS_CLUSTER = null; + } + })); + } + } + + protected static Config createConfig() { + Config config = new Config(); + config.setProtocol(protocol); + config.useSingleServer() + .setAddress("redis://127.0.0.1:" + REDIS.getFirstMappedPort()); + return config; + } + + protected static RedissonClient createInstance() { + Config config = createConfig(); + return Redisson.create(config); + } + + protected void testWithParams(Consumer redissonCallback, String... params) { + GenericContainer redis = + new GenericContainer<>("redis:7.2") + .withCreateContainerCmdModifier(cmd -> { + List args = new ArrayList<>(); + args.add("redis-server"); + args.addAll(Arrays.asList(params)); + cmd.withCmd(args); + }) + .withExposedPorts(6379); + redis.start(); + + Config config = new Config(); + config.setProtocol(protocol); + config.useSingleServer().setAddress("redis://127.0.0.1:" + redis.getFirstMappedPort()); + RedissonClient redisson = Redisson.create(config); + + try { + redissonCallback.accept(redisson); + } finally { + redisson.shutdown(); + redis.stop(); + } + + } + + protected void testInCluster(Consumer redissonCallback) { + if (redissonCluster == null) { + REDIS_CLUSTER = new GenericContainer<>("vishnunair/docker-redis-cluster") + .withExposedPorts(6379, 6380, 6381, 6382, 6383, 6384) + .withStartupCheckStrategy(new MinimumDurationRunningStartupCheckStrategy(Duration.ofSeconds(15))); + REDIS_CLUSTER.start(); + + Config config = new Config(); + config.setProtocol(protocol); + config.useClusterServers() + .setNatMapper(new NatMapper() { + @Override + public RedisURI map(RedisURI uri) { + if (REDIS_CLUSTER.getMappedPort(uri.getPort()) == null) { + return uri; + } + return new RedisURI(uri.getScheme(), REDIS_CLUSTER.getHost(), REDIS_CLUSTER.getMappedPort(uri.getPort())); + } + }) + .addNodeAddress("redis://127.0.0.1:" + REDIS_CLUSTER.getFirstMappedPort()); + redissonCluster = Redisson.create(config); + } + + redissonCallback.accept(new RedissonClusterConnection(redissonCluster)); + } + + protected void withSentinel(BiConsumer>, Config> callback, int slaves) throws InterruptedException { + Network network = Network.newNetwork(); + + List>> nodes = new ArrayList<>(); + + GenericContainer master = + new GenericContainer<>("bitnami/redis:7.2.4") + .withNetwork(network) + .withEnv("REDIS_REPLICATION_MODE", "master") + .withEnv("ALLOW_EMPTY_PASSWORD", "yes") + .withNetworkAliases("redis") + .withExposedPorts(6379); + master.start(); + assert master.getNetwork() == network; + int masterPort = master.getFirstMappedPort(); + master.withCreateContainerCmdModifier(cmd -> { + cmd.getHostConfig().withPortBindings( + new PortBinding(Ports.Binding.bindPort(Integer.valueOf(masterPort)), + cmd.getExposedPorts()[0])); + }); + nodes.add(master); + + for (int i = 0; i < slaves; i++) { + GenericContainer slave = + new GenericContainer<>("bitnami/redis:7.2.4") + .withNetwork(network) + .withEnv("REDIS_REPLICATION_MODE", "slave") + .withEnv("REDIS_MASTER_HOST", "redis") + .withEnv("ALLOW_EMPTY_PASSWORD", "yes") + .withNetworkAliases("slave" + i) + .withExposedPorts(6379); + slave.start(); + int slavePort = slave.getFirstMappedPort(); + slave.withCreateContainerCmdModifier(cmd -> { + cmd.getHostConfig().withPortBindings( + new PortBinding(Ports.Binding.bindPort(Integer.valueOf(slavePort)), + cmd.getExposedPorts()[0])); + }); + nodes.add(slave); + } + + GenericContainer sentinel1 = + new GenericContainer<>("bitnami/redis-sentinel:7.2.4") + + .withNetwork(network) + .withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000") + .withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000") + .withNetworkAliases("sentinel1") + .withExposedPorts(26379); + sentinel1.start(); + int sentinel1Port = sentinel1.getFirstMappedPort(); + sentinel1.withCreateContainerCmdModifier(cmd -> { + cmd.getHostConfig().withPortBindings( + new PortBinding(Ports.Binding.bindPort(Integer.valueOf(sentinel1Port)), + cmd.getExposedPorts()[0])); + }); + nodes.add(sentinel1); + + GenericContainer sentinel2 = + new GenericContainer<>("bitnami/redis-sentinel:7.2.4") + .withNetwork(network) + .withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000") + .withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000") + .withNetworkAliases("sentinel2") + .withExposedPorts(26379); + sentinel2.start(); + int sentinel2Port = sentinel2.getFirstMappedPort(); + sentinel2.withCreateContainerCmdModifier(cmd -> { + cmd.getHostConfig().withPortBindings( + new PortBinding(Ports.Binding.bindPort(Integer.valueOf(sentinel2Port)), + cmd.getExposedPorts()[0])); + }); + nodes.add(sentinel2); + + GenericContainer sentinel3 = + new GenericContainer<>("bitnami/redis-sentinel:7.2.4") + .withNetwork(network) + .withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000") + .withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000") + .withNetworkAliases("sentinel3") + .withExposedPorts(26379); + sentinel3.start(); + int sentinel3Port = sentinel3.getFirstMappedPort(); + sentinel3.withCreateContainerCmdModifier(cmd -> { + cmd.getHostConfig().withPortBindings( + new PortBinding(Ports.Binding.bindPort(Integer.valueOf(sentinel3Port)), + cmd.getExposedPorts()[0])); + }); + nodes.add(sentinel3); + + Thread.sleep(5000); + + Config config = new Config(); + config.setProtocol(protocol); + config.useSentinelServers() + .setPingConnectionInterval(0) + .setNatMapper(new NatMapper() { + + @Override + public RedisURI map(RedisURI uri) { + for (GenericContainer> node : nodes) { + if (node.getContainerInfo() == null) { + continue; + } + + Ports.Binding[] mappedPort = node.getContainerInfo().getNetworkSettings() + .getPorts().getBindings().get(new ExposedPort(uri.getPort())); + + Map ss = node.getContainerInfo().getNetworkSettings().getNetworks(); + ContainerNetwork s = ss.values().iterator().next(); + + if (uri.getPort() == 6379 + && !uri.getHost().equals("redis") + && BaseTest.this.getClass() == BaseTest.class + && node.getNetworkAliases().contains("slave0")) { + return new RedisURI(uri.getScheme(), "127.0.0.1", Integer.valueOf(mappedPort[0].getHostPortSpec())); + } + + if (mappedPort != null + && s.getIpAddress().equals(uri.getHost())) { + return new RedisURI(uri.getScheme(), "127.0.0.1", Integer.valueOf(mappedPort[0].getHostPortSpec())); + } + } + return uri; + } + }) + .addSentinelAddress("redis://127.0.0.1:" + sentinel1.getFirstMappedPort()) + .setMasterName("mymaster"); + + callback.accept(nodes, config); + + nodes.forEach(n -> n.stop()); + network.close(); + } + + @Before + public void beforeEach() { + redisson.getKeys().flushall(); + if (redissonCluster != null) { + redissonCluster.getKeys().flushall(); + } + } +} diff --git a/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/ClusterRunner.java b/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/ClusterRunner.java new file mode 100644 index 000000000..69af59386 --- /dev/null +++ b/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/ClusterRunner.java @@ -0,0 +1,156 @@ +package org.redisson; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.math.BigInteger; +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.redisson.misc.BiHashMap; + +/** + * + * @author Rui Gu (https://github.com/jackygurui) + */ +public class ClusterRunner { + + private final LinkedHashMap nodes = new LinkedHashMap<>(); + private final LinkedHashMap slaveMasters = new LinkedHashMap<>(); + + public ClusterRunner addNode(RedisRunner runner) { + nodes.putIfAbsent(runner, getRandomId()); + if (!runner.hasOption(RedisRunner.REDIS_OPTIONS.CLUSTER_ENABLED)) { + runner.clusterEnabled(true); + } + if (!runner.hasOption(RedisRunner.REDIS_OPTIONS.CLUSTER_NODE_TIMEOUT)) { + runner.clusterNodeTimeout(5000); + } + if (!runner.hasOption(RedisRunner.REDIS_OPTIONS.PORT)) { + runner.randomPort(1); + runner.port(RedisRunner.findFreePort()); + } + if (!runner.hasOption(RedisRunner.REDIS_OPTIONS.BIND)) { + runner.bind("127.0.0.1"); + } + return this; + } + + public ClusterRunner addNode(RedisRunner master, RedisRunner... slaves) { + addNode(master); + for (RedisRunner slave : slaves) { + addNode(slave); + slaveMasters.put(nodes.get(slave), nodes.get(master)); + } + return this; + } + + public synchronized ClusterProcesses run() throws IOException, InterruptedException, RedisRunner.FailedToStartRedisException { + BiHashMap processes = new BiHashMap<>(); + for (RedisRunner runner : nodes.keySet()) { + List options = getClusterConfig(runner); + String confFile = runner.dir() + File.separator + nodes.get(runner) + ".conf"; + System.out.println("WRITING CONFIG: for " + nodes.get(runner)); + try (PrintWriter printer = new PrintWriter(new FileWriter(confFile))) { + options.stream().forEach((line) -> { + printer.println(line); + System.out.println(line); + }); + } + processes.put(nodes.get(runner), runner.clusterConfigFile(confFile).run()); + } + Thread.sleep(1000); + for (RedisRunner.RedisProcess process : processes.valueSet()) { + if (!process.isAlive()) { + throw new RedisRunner.FailedToStartRedisException(); + } + } + return new ClusterProcesses(processes); + } + + private List getClusterConfig(RedisRunner runner) { + String me = runner.getInitialBindAddr() + ":" + runner.getPort(); + List nodeConfig = new ArrayList<>(); + int c = 0; + for (RedisRunner node : nodes.keySet()) { + String nodeId = nodes.get(node); + StringBuilder sb = new StringBuilder(); + String nodeAddr = node.getInitialBindAddr() + ":" + node.getPort(); + sb.append(nodeId).append(" "); + sb.append(nodeAddr).append(" "); + sb.append(me.equals(nodeAddr) + ? "myself," + : ""); + boolean isMaster = !slaveMasters.containsKey(nodeId); + if (isMaster) { + sb.append("master -"); + } else { + sb.append("slave ").append(slaveMasters.get(nodeId)); + } + sb.append(" "); + sb.append("0").append(" "); + sb.append(me.equals(nodeAddr) + ? "0" + : "1").append(" "); + sb.append(c + 1).append(" "); + sb.append("connected "); + if (isMaster) { + sb.append(getSlots(c, nodes.size() - slaveMasters.size())); + c++; + } + nodeConfig.add(sb.toString()); + } + nodeConfig.add("vars currentEpoch 0 lastVoteEpoch 0"); + return nodeConfig; + } + + private static String getSlots(int index, int groupNum) { + final double t = 16383; + int start = index == 0 ? 0 : (int) (t / groupNum * index); + int end = index == groupNum - 1 ? (int) t : (int) (t / groupNum * (index + 1)) - 1; + return start + "-" + end; + } + + private static String getRandomId() { + final SecureRandom r = new SecureRandom(); + return new BigInteger(160, r).toString(16); + } + + public static class ClusterProcesses { + private final BiHashMap processes; + + private ClusterProcesses(BiHashMap processes) { + this.processes = processes; + } + + public RedisRunner.RedisProcess getProcess(String nodeId) { + return processes.get(nodeId); + } + + public String getNodeId(RedisRunner.RedisProcess process) { + return processes.reverseGet(process); + } + + public Set getNodes() { + return processes.valueSet(); + } + + public Set getNodeIds() { + return processes.keySet(); + } + + public synchronized Map shutdown() { + return processes + .entrySet() + .stream() + .collect(Collectors.toMap( + e -> e.getKey(), + e -> e.getValue().stop())); + } + } +} diff --git a/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/RedisRunner.java b/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/RedisRunner.java new file mode 100644 index 000000000..5f6a07eda --- /dev/null +++ b/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/RedisRunner.java @@ -0,0 +1,1070 @@ +package org.redisson; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.net.Inet4Address; +import java.net.ServerSocket; +import java.net.URL; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import org.redisson.client.RedisClient; +import org.redisson.client.RedisClientConfig; +import org.redisson.client.RedisConnection; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.client.protocol.RedisStrictCommand; +import org.redisson.client.protocol.convertor.VoidReplayConvertor; + +/** + * + * @author Rui Gu (https://github.com/jackygurui) + */ +public class RedisRunner { + + public enum REDIS_OPTIONS { + + BINARY_PATH, + DAEMONIZE, + PIDFILE, + PORT, + TCP_BACKLOG, + BIND(true), + UNIXSOCKET, + UNIXSOCKETPERM, + TIMEOUT, + TCP_KEEPALIVE, + LOGLEVEL, + LOGFILE, + SYSLOG_ENABLED, + SYSLOG_IDENT, + SYSLOG_FACILITY, + DATABASES, + SAVE(true), + STOP_WRITES_ON_BGSAVE_ERROR, + RDBCOMPRESSION, + RDBCHECKSUM, + DBFILENAME, + DIR, + SLAVEOF, + MASTERAUTH, + SLAVE_SERVE_STALE_DATA, + SLAVE_READ_ONLY, + REPL_DISKLESS_SYNC, + REPL_DISKLESS_SYNC_DELAY, + REPL_PING_SLAVE_PERIOD, + REPL_TIMEOUT, + REPL_DISABLE_TCP_NODELAY, + REPL_BACKLOG_SIZE, + REPL_BACKLOG_TTL, + SLAVE_PRIORITY, + MIN_SLAVES_TO_WRITE, + MIN_SLAVES_MAX_LAG, + REQUIREPASS, + RENAME_COMMAND(true), + MAXCLIENTS, + MAXMEMORY, + MAXMEMORY_POLICY, + MAXMEMORY_SAMPLE, + APPENDONLY, + APPENDFILENAME, + APPENDFSYNC, + NO_APPENDFSYNC_ON_REWRITE, + AUTO_AOF_REWRITE_PERCENTAGE, + AUTO_AOF_REWRITE_MIN_SIZE, + AOF_LOAD_TRUNCATED, + LUA_TIME_LIMIT, + CLUSTER_ENABLED, + CLUSTER_CONFIG_FILE, + CLUSTER_NODE_TIMEOUT, + CLUSTER_SLAVE_VALIDITY_FACTOR, + CLUSTER_MIGRATION_BARRIER, + CLUSTER_REQUIRE_FULL_COVERAGE, + SLOWLOG_LOG_SLOWER_THAN, + SLOWLOG_MAX_LEN, + LATENCY_MONITOR_THRESHOLD, + NOTIFY_KEYSPACE_EVENTS, + HASH_MAX_ZIPLIST_ENTRIES, + HASH_MAX_ZIPLIST_VALUE, + LIST_MAX_ZIPLIST_ENTRIES, + LIST_MAX_ZIPLIST_VALUE, + SET_MAX_INTSET_ENTRIES, + ZSET_MAX_ZIPLIST_ENTRIES, + ZSET_MAX_ZIPLIST_VALUE, + HLL_SPARSE_MAX_BYTES, + ACTIVEREHASHING, + CLIENT_OUTPUT_BUFFER_LIMIT$NORMAL, + CLIENT_OUTPUT_BUFFER_LIMIT$SLAVE, + CLIENT_OUTPUT_BUFFER_LIMIT$PUBSUB, + HZ, + AOF_REWRITE_INCREMENTAL_FSYNC, + PROTECTED_MODE, + SENTINEL, + SENTINEL$ANNOUNCE_IP, + SENTINEL$ANNOUNCE_PORT, + SENTINEL$MONITOR(true), + SENTINEL$AUTH_PASS(true), + SENTINEL$DOWN_AFTER_MILLISECONDS(true), + SENTINEL$PARALLEL_SYNCS(true), + SENTINEL$FAILOVER_TIMEOUT(true), + SENTINEL$NOTIFICATION_SCRIPT(true), + SENTINEL$CLIENT_RECONFIG_SCRIPT(true) + ; + + private final boolean allowMutiple; + + private REDIS_OPTIONS() { + this.allowMutiple = false; + } + + private REDIS_OPTIONS(boolean allowMutiple) { + this.allowMutiple = allowMutiple; + } + + public boolean isAllowMultiple() { + return allowMutiple; + } + } + + public enum LOGLEVEL_OPTIONS { + + DEBUG, + VERBOSE, + NOTICE, + WARNING + } + + public enum SYSLOG_FACILITY_OPTIONS { + + USER, + LOCAL0, + LOCAL1, + LOCAL2, + LOCAL3, + LOCAL4, + LOCAL5, + LOCAL6, + LOCAL7 + } + + public enum MAX_MEMORY_POLICY_OPTIONS { + + VOLATILE_LRU, + ALLKEYS_LRU, + VOLATILE_RANDOM, + ALLKEYS_RANDOM, + VOLATILE_TTL, + NOEVICTION + } + + public enum APPEND_FSYNC_MODE_OPTIONS { + + ALWAYS, + EVERYSEC, + NO + } + + public enum KEYSPACE_EVENTS_OPTIONS { + + K, + E, + g, + $, + l, + s, + h, + z, + x, + e, + A + } + + private final LinkedHashMap options = new LinkedHashMap<>(); + protected static RedisRunner.RedisProcess defaultRedisInstance; + private static int defaultRedisInstanceExitCode; + + private String path = ""; + private String defaultDir = Paths.get("").toString(); + private boolean nosave = false; + private boolean randomDir = false; + private ArrayList bindAddr = new ArrayList<>(); + private int port = 6379; + private int retryCount = Integer.MAX_VALUE; + private boolean randomPort = false; + private String sentinelFile; + private String clusterFile; + + { + this.options.put(REDIS_OPTIONS.BINARY_PATH, RedissonRuntimeEnvironment.redisBinaryPath); + } + + /** + * To change the redisBinary system property for running the test, + * use argLine option from surefire plugin: + * + * $ mvn -DargLine="-DredisBinary=`which redis-server`" -Punit-test clean \ + * verify + * + * @param configPath + * @return Process running redis instance + * @throws IOException + * @throws InterruptedException + * @see + * + * http://maven.apache.org/surefire/maven-surefire-plugin/test-mojo.html#argLine + */ + public static RedisProcess runRedisWithConfigFile(String configPath) throws IOException, InterruptedException { + URL resource = RedisRunner.class.getResource(configPath); + return runWithOptions(new RedisRunner(), RedissonRuntimeEnvironment.redisBinaryPath, resource.getFile()); + } + + private static RedisProcess runWithOptions(RedisRunner runner, String... options) throws IOException, InterruptedException { + List launchOptions = Arrays.stream(options) + .map(x -> Arrays.asList(x.split(" "))).flatMap(x -> x.stream()) + .collect(Collectors.toList()); + System.out.println("REDIS LAUNCH OPTIONS: " + Arrays.toString(launchOptions.toArray())); + ProcessBuilder master = new ProcessBuilder(launchOptions) + .redirectErrorStream(true) + .directory(new File(RedissonRuntimeEnvironment.tempDir)); + Process p = master.start(); + new Thread(() -> { + BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream())); + String line; + try { + while (p.isAlive() && (line = reader.readLine()) != null && !RedissonRuntimeEnvironment.isTravis) { + System.out.println("REDIS PROCESS: " + line); + } + } catch (IOException ex) { + System.out.println("Exception: " + ex.getLocalizedMessage()); + } + }).start(); + Thread.sleep(1500); + return new RedisProcess(p, runner); + } + + public RedisProcess run() throws IOException, InterruptedException, FailedToStartRedisException { + if (!options.containsKey(REDIS_OPTIONS.DIR)) { + addConfigOption(REDIS_OPTIONS.DIR, defaultDir); + } + if (randomPort) { + for (int i = 0; i < retryCount; i++) { + this.port = findFreePort(); + addConfigOption(REDIS_OPTIONS.PORT, this.port); + try { + return runAndCheck(); + } catch (FailedToStartRedisException e) { + } + } + throw new FailedToStartRedisException(); + } else { + return runAndCheck(); + } + } + + public RedisProcess runAndCheck() throws IOException, InterruptedException, FailedToStartRedisException { + List args = new ArrayList(options.values()); + if (sentinelFile != null && sentinelFile.length() > 0) { + String confFile = defaultDir + File.separator + sentinelFile; + try (PrintWriter printer = new PrintWriter(new FileWriter(confFile))) { + args.stream().forEach((arg) -> { + if (arg.contains("--")) { + printer.println(arg.replace("--", "")); + } + }); + } + args = args.subList(0, 1); + args.add(confFile); + args.add("--sentinel"); + } + RedisProcess rp = runWithOptions(this, args.toArray(new String[0])); + if (!isCluster() + && rp.redisProcess.waitFor(1000, TimeUnit.MILLISECONDS)) { + throw new FailedToStartRedisException(); + } + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + rp.stop(); + })); + return rp; + } + + public boolean hasOption(REDIS_OPTIONS option) { + return options.containsKey(option); + } + + private void addConfigOption(REDIS_OPTIONS option, Object... args) { + StringBuilder sb = new StringBuilder("--") + .append(option.toString() + .replaceAll("_", "-") + .replaceAll("\\$", " ") + .toLowerCase()) + .append(" ") + .append(Arrays.stream(args).map(Object::toString) + .collect(Collectors.joining(" "))); + this.options.put(option, + option.isAllowMultiple() + ? sb.insert(0, this.options.getOrDefault(option, "")).toString() + : sb.toString()); + } + + private String convertBoolean(boolean b) { + return b ? "yes" : "no"; + } + + public RedisRunner daemonize(boolean daemonize) { + addConfigOption(REDIS_OPTIONS.DAEMONIZE, convertBoolean(daemonize)); + return this; + } + + public RedisRunner pidfile(String pidfile) { + addConfigOption(REDIS_OPTIONS.PIDFILE, pidfile); + return this; + } + + public RedisRunner port(int port) { + this.port = port; + this.randomPort = false; + addConfigOption(REDIS_OPTIONS.PORT, port); + return this; + } + + public RedisRunner randomPort() { + return randomPort(Integer.MAX_VALUE); + } + + public RedisRunner randomPort(int retryCount) { + this.randomPort = true; + this.retryCount = retryCount; + options.remove(REDIS_OPTIONS.PORT); + return this; + } + + public int getPort() { + return this.port; + } + + public RedisRunner tcpBacklog(long tcpBacklog) { + addConfigOption(REDIS_OPTIONS.TCP_BACKLOG, tcpBacklog); + return this; + } + + public RedisRunner bind(String bind) { + this.bindAddr.add(bind); + addConfigOption(REDIS_OPTIONS.BIND, bind); + return this; + } + + public ArrayList getBindAddr() { + return this.bindAddr; + } + + public RedisRunner unixsocket(String unixsocket) { + addConfigOption(REDIS_OPTIONS.UNIXSOCKET, unixsocket); + return this; + } + + public RedisRunner unixsocketperm(int unixsocketperm) { + addConfigOption(REDIS_OPTIONS.UNIXSOCKETPERM, unixsocketperm); + return this; + } + + public RedisRunner timeout(long timeout) { + addConfigOption(REDIS_OPTIONS.TIMEOUT, timeout); + return this; + } + + public RedisRunner tcpKeepalive(long tcpKeepalive) { + addConfigOption(REDIS_OPTIONS.TCP_KEEPALIVE, tcpKeepalive); + return this; + } + + public RedisRunner loglevel(LOGLEVEL_OPTIONS loglevel) { + addConfigOption(REDIS_OPTIONS.LOGLEVEL, loglevel.toString()); + return this; + } + + public RedisRunner logfile(String logfile) { + addConfigOption(REDIS_OPTIONS.LOGLEVEL, logfile); + return this; + } + + public RedisRunner syslogEnabled(boolean syslogEnabled) { + addConfigOption(REDIS_OPTIONS.SYSLOG_ENABLED, convertBoolean(syslogEnabled)); + return this; + } + + public RedisRunner syslogIdent(String syslogIdent) { + addConfigOption(REDIS_OPTIONS.SYSLOG_IDENT, syslogIdent); + return this; + } + + public RedisRunner syslogFacility(SYSLOG_FACILITY_OPTIONS syslogFacility) { + addConfigOption(REDIS_OPTIONS.SYSLOG_IDENT, syslogFacility.toString()); + return this; + } + + public RedisRunner databases(int databases) { + addConfigOption(REDIS_OPTIONS.DATABASES, databases); + return this; + } + + public RedisRunner save(long seconds, long changes) { + if (!nosave) { + addConfigOption(REDIS_OPTIONS.SAVE, seconds, changes); + } + return this; + } + + /** + * Phantom option + * + * @return RedisRunner + */ + public RedisRunner nosave() { + this.nosave = true; + options.remove(REDIS_OPTIONS.SAVE); +// addConfigOption(REDIS_OPTIONS.SAVE, "''"); + return this; + } + + public RedisRunner stopWritesOnBgsaveError(boolean stopWritesOnBgsaveError) { + addConfigOption(REDIS_OPTIONS.STOP_WRITES_ON_BGSAVE_ERROR, convertBoolean(stopWritesOnBgsaveError)); + return this; + } + + public RedisRunner rdbcompression(boolean rdbcompression) { + addConfigOption(REDIS_OPTIONS.RDBCOMPRESSION, convertBoolean(rdbcompression)); + return this; + } + + public RedisRunner rdbchecksum(boolean rdbchecksum) { + addConfigOption(REDIS_OPTIONS.RDBCHECKSUM, convertBoolean(rdbchecksum)); + return this; + } + + public RedisRunner dbfilename(String dbfilename) { + addConfigOption(REDIS_OPTIONS.DBFILENAME, dbfilename); + return this; + } + + public RedisRunner dir(String dir) { + if (!randomDir) { + addConfigOption(REDIS_OPTIONS.DIR, dir); + this.path = dir; + } + return this; + } + + /** + * Phantom option + * + * @return RedisRunner + */ + public RedisRunner randomDir() { + this.randomDir = true; + options.remove(REDIS_OPTIONS.DIR); + makeRandomDefaultDir(); + + + addConfigOption(REDIS_OPTIONS.DIR, "\"" + defaultDir + "\""); + return this; + } + + public RedisRunner slaveof(Inet4Address masterip, int port) { + addConfigOption(REDIS_OPTIONS.SLAVEOF, masterip.getHostAddress(), port); + return this; + } + + public RedisRunner slaveof(String masterip, int port) { + addConfigOption(REDIS_OPTIONS.SLAVEOF, masterip, port); + return this; + } + + public RedisRunner masterauth(String masterauth) { + addConfigOption(REDIS_OPTIONS.MASTERAUTH, masterauth); + return this; + } + + public RedisRunner slaveServeStaleData(boolean slaveServeStaleData) { + addConfigOption(REDIS_OPTIONS.SLAVE_SERVE_STALE_DATA, convertBoolean(slaveServeStaleData)); + return this; + } + + public RedisRunner slaveReadOnly(boolean slaveReadOnly) { + addConfigOption(REDIS_OPTIONS.SLAVE_READ_ONLY, convertBoolean(slaveReadOnly)); + return this; + } + + public RedisRunner replDisklessSync(boolean replDisklessSync) { + addConfigOption(REDIS_OPTIONS.REPL_DISKLESS_SYNC, convertBoolean(replDisklessSync)); + return this; + } + + public RedisRunner replDisklessSyncDelay(long replDisklessSyncDelay) { + addConfigOption(REDIS_OPTIONS.REPL_DISKLESS_SYNC_DELAY, replDisklessSyncDelay); + return this; + } + + public RedisRunner replPingSlavePeriod(long replPingSlavePeriod) { + addConfigOption(REDIS_OPTIONS.REPL_PING_SLAVE_PERIOD, replPingSlavePeriod); + return this; + } + + public RedisRunner replTimeout(long replTimeout) { + addConfigOption(REDIS_OPTIONS.REPL_TIMEOUT, replTimeout); + return this; + } + + public RedisRunner replDisableTcpNodelay(boolean replDisableTcpNodelay) { + addConfigOption(REDIS_OPTIONS.REPL_DISABLE_TCP_NODELAY, convertBoolean(replDisableTcpNodelay)); + return this; + } + + public RedisRunner replBacklogSize(String replBacklogSize) { + addConfigOption(REDIS_OPTIONS.REPL_BACKLOG_SIZE, replBacklogSize); + return this; + } + + public RedisRunner replBacklogTtl(long replBacklogTtl) { + addConfigOption(REDIS_OPTIONS.REPL_BACKLOG_TTL, replBacklogTtl); + return this; + } + + public RedisRunner slavePriority(long slavePriority) { + addConfigOption(REDIS_OPTIONS.SLAVE_PRIORITY, slavePriority); + return this; + } + + public RedisRunner minSlaveToWrite(long minSlaveToWrite) { + addConfigOption(REDIS_OPTIONS.MIN_SLAVES_TO_WRITE, minSlaveToWrite); + return this; + } + + public RedisRunner minSlaveMaxLag(long minSlaveMaxLag) { + addConfigOption(REDIS_OPTIONS.MIN_SLAVES_MAX_LAG, minSlaveMaxLag); + return this; + } + + public RedisRunner requirepass(String requirepass) { + addConfigOption(REDIS_OPTIONS.REQUIREPASS, requirepass); + return this; + } + + public RedisRunner renameCommand(String renameCommand) { + addConfigOption(REDIS_OPTIONS.RENAME_COMMAND, renameCommand); + return this; + } + + public RedisRunner maxclients(long maxclients) { + addConfigOption(REDIS_OPTIONS.MAXCLIENTS, maxclients); + return this; + } + + public RedisRunner maxmemory(String maxmemory) { + addConfigOption(REDIS_OPTIONS.MAXMEMORY, maxmemory); + return this; + } + + public RedisRunner maxmemoryPolicy(MAX_MEMORY_POLICY_OPTIONS maxmemoryPolicy) { + addConfigOption(REDIS_OPTIONS.MAXMEMORY, maxmemoryPolicy.toString()); + return this; + } + + public RedisRunner maxmemorySamples(long maxmemorySamples) { + addConfigOption(REDIS_OPTIONS.MAXMEMORY, maxmemorySamples); + return this; + } + + public RedisRunner appendonly(boolean appendonly) { + addConfigOption(REDIS_OPTIONS.APPENDONLY, convertBoolean(appendonly)); + return this; + } + + public RedisRunner appendfilename(String appendfilename) { + addConfigOption(REDIS_OPTIONS.APPENDFILENAME, appendfilename); + return this; + } + + public RedisRunner appendfsync(APPEND_FSYNC_MODE_OPTIONS appendfsync) { + addConfigOption(REDIS_OPTIONS.APPENDFSYNC, appendfsync.toString()); + return this; + } + + public RedisRunner noAppendfsyncOnRewrite(boolean noAppendfsyncOnRewrite) { + addConfigOption(REDIS_OPTIONS.NO_APPENDFSYNC_ON_REWRITE, convertBoolean(noAppendfsyncOnRewrite)); + return this; + } + + public RedisRunner autoAofRewritePercentage(int autoAofRewritePercentage) { + addConfigOption(REDIS_OPTIONS.AUTO_AOF_REWRITE_PERCENTAGE, autoAofRewritePercentage); + return this; + } + + public RedisRunner autoAofRewriteMinSize(String autoAofRewriteMinSize) { + addConfigOption(REDIS_OPTIONS.AUTO_AOF_REWRITE_MIN_SIZE, autoAofRewriteMinSize); + return this; + } + + public RedisRunner aofLoadTruncated(boolean aofLoadTruncated) { + addConfigOption(REDIS_OPTIONS.AOF_LOAD_TRUNCATED, convertBoolean(aofLoadTruncated)); + return this; + } + + public RedisRunner luaTimeLimit(long luaTimeLimit) { + addConfigOption(REDIS_OPTIONS.AOF_LOAD_TRUNCATED, luaTimeLimit); + return this; + } + + public RedisRunner clusterEnabled(boolean clusterEnabled) { + addConfigOption(REDIS_OPTIONS.CLUSTER_ENABLED, convertBoolean(clusterEnabled)); + return this; + } + + public RedisRunner clusterConfigFile(String clusterConfigFile) { + addConfigOption(REDIS_OPTIONS.CLUSTER_CONFIG_FILE, clusterConfigFile); + this.clusterFile = clusterConfigFile; + return this; + } + + public RedisRunner clusterNodeTimeout(long clusterNodeTimeout) { + addConfigOption(REDIS_OPTIONS.CLUSTER_NODE_TIMEOUT, clusterNodeTimeout); + return this; + } + + public RedisRunner clusterSlaveValidityFactor(long clusterSlaveValidityFactor) { + addConfigOption(REDIS_OPTIONS.CLUSTER_SLAVE_VALIDITY_FACTOR, clusterSlaveValidityFactor); + return this; + } + + public RedisRunner clusterMigrationBarrier(long clusterMigrationBarrier) { + addConfigOption(REDIS_OPTIONS.CLUSTER_MIGRATION_BARRIER, clusterMigrationBarrier); + return this; + } + + public RedisRunner clusterRequireFullCoverage(boolean clusterRequireFullCoverage) { + addConfigOption(REDIS_OPTIONS.CLUSTER_REQUIRE_FULL_COVERAGE, convertBoolean(clusterRequireFullCoverage)); + return this; + } + + public RedisRunner slowlogLogSlowerThan(long slowlogLogSlowerThan) { + addConfigOption(REDIS_OPTIONS.SLOWLOG_LOG_SLOWER_THAN, slowlogLogSlowerThan); + return this; + } + + public RedisRunner slowlogMaxLen(long slowlogMaxLen) { + addConfigOption(REDIS_OPTIONS.SLOWLOG_MAX_LEN, slowlogMaxLen); + return this; + } + + public RedisRunner latencyMonitorThreshold(long latencyMonitorThreshold) { + addConfigOption(REDIS_OPTIONS.LATENCY_MONITOR_THRESHOLD, latencyMonitorThreshold); + return this; + } + + public RedisRunner notifyKeyspaceEvents(KEYSPACE_EVENTS_OPTIONS... notifyKeyspaceEvents) { + String existing = this.options.getOrDefault(REDIS_OPTIONS.NOTIFY_KEYSPACE_EVENTS, ""); + + String events = Arrays.stream(notifyKeyspaceEvents) + .collect(StringBuilder::new, StringBuilder::append, StringBuilder::append).toString(); + + addConfigOption(REDIS_OPTIONS.NOTIFY_KEYSPACE_EVENTS, + existing.contains(events) + ? existing + : (existing + events)); + return this; + } + + public RedisRunner hashMaxZiplistEntries(long hashMaxZiplistEntries) { + addConfigOption(REDIS_OPTIONS.HASH_MAX_ZIPLIST_ENTRIES, hashMaxZiplistEntries); + return this; + } + + public RedisRunner hashMaxZiplistValue(long hashMaxZiplistValue) { + addConfigOption(REDIS_OPTIONS.HASH_MAX_ZIPLIST_VALUE, hashMaxZiplistValue); + return this; + } + + public RedisRunner listMaxZiplistEntries(long listMaxZiplistEntries) { + addConfigOption(REDIS_OPTIONS.LIST_MAX_ZIPLIST_ENTRIES, listMaxZiplistEntries); + return this; + } + + public RedisRunner listMaxZiplistValue(long listMaxZiplistValue) { + addConfigOption(REDIS_OPTIONS.LIST_MAX_ZIPLIST_VALUE, listMaxZiplistValue); + return this; + } + + public RedisRunner setMaxIntsetEntries(long setMaxIntsetEntries) { + addConfigOption(REDIS_OPTIONS.SET_MAX_INTSET_ENTRIES, setMaxIntsetEntries); + return this; + } + + public RedisRunner zsetMaxZiplistEntries(long zsetMaxZiplistEntries) { + addConfigOption(REDIS_OPTIONS.ZSET_MAX_ZIPLIST_ENTRIES, zsetMaxZiplistEntries); + return this; + } + + public RedisRunner zsetMaxZiplistValue(long zsetMaxZiplistValue) { + addConfigOption(REDIS_OPTIONS.ZSET_MAX_ZIPLIST_VALUE, zsetMaxZiplistValue); + return this; + } + + public RedisRunner hllSparseMaxBytes(long hllSparseMaxBytes) { + addConfigOption(REDIS_OPTIONS.HLL_SPARSE_MAX_BYTES, hllSparseMaxBytes); + return this; + } + + public RedisRunner activerehashing(boolean activerehashing) { + addConfigOption(REDIS_OPTIONS.ACTIVEREHASHING, convertBoolean(activerehashing)); + return this; + } + + public RedisRunner clientOutputBufferLimit$Normal(String hardLimit, String softLimit, long softSeconds) { + addConfigOption(REDIS_OPTIONS.CLIENT_OUTPUT_BUFFER_LIMIT$NORMAL, hardLimit, softLimit, softSeconds); + return this; + } + + public RedisRunner clientOutputBufferLimit$Slave(String hardLimit, String softLimit, long softSeconds) { + addConfigOption(REDIS_OPTIONS.CLIENT_OUTPUT_BUFFER_LIMIT$SLAVE, hardLimit, softLimit, softSeconds); + return this; + } + + public RedisRunner clientOutputBufferLimit$Pubsub(String hardLimit, String softLimit, long softSeconds) { + addConfigOption(REDIS_OPTIONS.CLIENT_OUTPUT_BUFFER_LIMIT$PUBSUB, hardLimit, softLimit, softSeconds); + return this; + } + + public RedisRunner hz(int hz) { + addConfigOption(REDIS_OPTIONS.HZ, hz); + return this; + } + + public RedisRunner aofRewriteIncrementalFsync(boolean aofRewriteIncrementalFsync) { + addConfigOption(REDIS_OPTIONS.AOF_REWRITE_INCREMENTAL_FSYNC, convertBoolean(aofRewriteIncrementalFsync)); + return this; + } + + public RedisRunner protectedMode(boolean protectedMode) { + addConfigOption(REDIS_OPTIONS.PROTECTED_MODE, convertBoolean(protectedMode)); + return this; + } + + public RedisRunner sentinel() { + sentinelFile = "sentinel_conf_" + UUID.randomUUID() + ".conf"; + return this; + } + + public RedisRunner sentinelAnnounceIP(String sentinelAnnounceIP) { + addConfigOption(REDIS_OPTIONS.SENTINEL$ANNOUNCE_IP, sentinelAnnounceIP); + return this; + } + + public RedisRunner sentinelAnnouncePort(int sentinelAnnouncePort) { + addConfigOption(REDIS_OPTIONS.SENTINEL$ANNOUNCE_PORT, sentinelAnnouncePort); + return this; + } + + public RedisRunner sentinelMonitor(String masterName, String ip, int port, int quorum) { + addConfigOption(REDIS_OPTIONS.SENTINEL$MONITOR, masterName, ip, port, quorum); + return this; + } + + public RedisRunner sentinelAuthPass(String masterName, String password) { + addConfigOption(REDIS_OPTIONS.SENTINEL$AUTH_PASS, masterName, password); + return this; + } + + public RedisRunner sentinelDownAfterMilliseconds(String masterName, long downAfterMilliseconds) { + addConfigOption(REDIS_OPTIONS.SENTINEL$DOWN_AFTER_MILLISECONDS, masterName, downAfterMilliseconds); + return this; + } + + public RedisRunner sentinelParallelSyncs(String masterName, int numSlaves) { + addConfigOption(REDIS_OPTIONS.SENTINEL$PARALLEL_SYNCS, masterName, numSlaves); + return this; + } + + public RedisRunner sentinelFailoverTimeout(String masterName, long failoverTimeout) { + addConfigOption(REDIS_OPTIONS.SENTINEL$FAILOVER_TIMEOUT, masterName, failoverTimeout); + return this; + } + + public RedisRunner sentinelNotificationScript(String masterName, String scriptPath) { + addConfigOption(REDIS_OPTIONS.SENTINEL$NOTIFICATION_SCRIPT, masterName, scriptPath); + return this; + } + + public RedisRunner sentinelClientReconfigScript(String masterName, String scriptPath) { + addConfigOption(REDIS_OPTIONS.SENTINEL$CLIENT_RECONFIG_SCRIPT, masterName, scriptPath); + return this; + } + + public boolean isSentinel() { + return this.sentinelFile != null; + } + + public boolean isCluster() { + return this.clusterFile != null; + } + + public boolean isRandomDir() { + return this.randomDir; + } + + public boolean isNosave() { + return this.nosave; + } + + public String defaultDir() { + return this.defaultDir; + } + + public String dir() { + return isRandomDir() ? defaultDir() : this.path; + } + + public String getInitialBindAddr() { + return bindAddr.size() > 0 ? bindAddr.get(0) : "localhost"; + } + + public boolean deleteDBfileDir() { + File f = new File(defaultDir); + if (f.exists()) { + System.out.println("REDIS RUNNER: Deleting directory " + f.getAbsolutePath()); + return f.delete(); + } + return false; + } + + public boolean deleteSentinelFile() { + File f = new File(defaultDir + File.separator + sentinelFile); + if (f.exists()) { + System.out.println("REDIS RUNNER: Deleting sentinel config file " + f.getAbsolutePath()); + return f.delete(); + } + return false; + } + + public boolean deleteClusterFile() { + File f = new File(clusterFile); + if (f.exists() && isRandomDir()) { + System.out.println("REDIS RUNNER: Deleting cluster config file " + f.getAbsolutePath()); + return f.delete(); + } + return false; + } + + private void makeRandomDefaultDir() { + File f = new File(RedissonRuntimeEnvironment.tempDir + File.separator + UUID.randomUUID()); + if (f.exists()) { + makeRandomDefaultDir(); + } else { + System.out.println("REDIS RUNNER: Making directory " + f.getAbsolutePath()); + f.mkdirs(); + this.defaultDir = f.getAbsolutePath(); + if (RedissonRuntimeEnvironment.isWindows) { + defaultDir = defaultDir.replace("\\", "\\\\"); + } + } + } + + public static final class RedisProcess { + + private final Process redisProcess; + private final RedisRunner runner; + private RedisVersion redisVersion; + + private RedisProcess(Process redisProcess, RedisRunner runner) { + this.redisProcess = redisProcess; + this.runner = runner; + } + + public int stop() { + if (runner.isNosave() && !runner.isRandomDir()) { + RedisClient c = createDefaultRedisClientInstance(); + RedisConnection connection = c.connect(); + try { + connection.async(new RedisStrictCommand("SHUTDOWN", "NOSAVE", new VoidReplayConvertor())) + .toCompletableFuture().get(3, TimeUnit.SECONDS); + } catch (InterruptedException interruptedException) { + //shutdown via command failed, lets wait and kill it later. + } catch (ExecutionException | TimeoutException e) { + // skip + } + c.shutdown(); + connection.closeAsync().syncUninterruptibly(); + } + Process p = redisProcess; + p.destroy(); + boolean normalTermination = false; + try { + normalTermination = p.waitFor(5, TimeUnit.SECONDS); + } catch (InterruptedException ex) { + //OK lets hurry up by force kill; + } + if (!normalTermination) { + p = p.destroyForcibly(); + } + cleanup(); + int exitCode = p.exitValue(); + return exitCode == 1 && RedissonRuntimeEnvironment.isWindows ? 0 : exitCode; + } + + private void cleanup() { + if (runner.isSentinel()) { + runner.deleteSentinelFile(); + } + if (runner.isCluster()) { + runner.deleteClusterFile(); + } + if (runner.isRandomDir()) { + runner.deleteDBfileDir(); + } + } + + public String getDefaultDir() { + return runner.getDefaultDir(); + } + + public Process getRedisProcess() { + return redisProcess; + } + + public RedisClient createRedisClientInstance() { + if (redisProcess.isAlive()) { + RedisClientConfig config = new RedisClientConfig(); + config.setAddress(runner.getInitialBindAddr(), runner.getPort()); + return RedisClient.create(config); + } + throw new IllegalStateException("Redis server instance is not running."); + } + + public RedisVersion getRedisVersion() { + if (redisVersion == null) { + RedisConnection c = createRedisClientInstance().connect(); + Map serverMap = c.sync(RedisCommands.INFO_SERVER); + redisVersion = new RedisVersion(serverMap.get("redis_version")); + c.closeAsync(); + } + return redisVersion; + } + + public int getRedisServerPort() { + return runner.getPort(); + } + + public String getRedisServerBindAddress() { + return runner.getInitialBindAddr(); + } + + public String getRedisServerAddressAndPort() { + return "redis://" + getRedisServerBindAddress() + ":" + getRedisServerPort(); + } + + public boolean isAlive() { + return redisProcess.isAlive(); + } + } + + public static RedisRunner.RedisProcess startDefaultRedisServerInstance() throws IOException, InterruptedException, FailedToStartRedisException { + if (defaultRedisInstance == null) { + System.out.println("REDIS RUNNER: Starting up default instance..."); + defaultRedisInstance = new RedisRunner().nosave().randomDir().randomPort().run(); + } + return defaultRedisInstance; + } + + public static int shutDownDefaultRedisServerInstance() throws InterruptedException { + if (defaultRedisInstance != null) { + System.out.println("REDIS RUNNER: Shutting down default instance..."); + try { + defaultRedisInstanceExitCode = defaultRedisInstance.stop(); + } finally { + defaultRedisInstance = null; + } + } else { + System.out.println("REDIS RUNNER: Default instance is already down with an exit code " + defaultRedisInstanceExitCode); + } + return defaultRedisInstanceExitCode; + } + + public static boolean isDefaultRedisServerInstanceRunning() { + return defaultRedisInstance != null && defaultRedisInstance.redisProcess.isAlive(); + } + + public static RedisClient createDefaultRedisClientInstance() { + return defaultRedisInstance.createRedisClientInstance(); + } + + public String getDefaultDir() { + return defaultDir; + } + + public static RedisRunner.RedisProcess getDefaultRedisServerInstance() { + return defaultRedisInstance; + } + + public static String getDefaultRedisServerBindAddressAndPort() { + return "redis://" + defaultRedisInstance.getRedisServerBindAddress() + + ":" + + defaultRedisInstance.getRedisServerPort(); + } + + public static int findFreePort() { + ServerSocket socket = null; + try { + socket = new ServerSocket(0); + socket.setReuseAddress(true); + int port = socket.getLocalPort(); + if (port > 55535 && isFreePort(port - 10000)) { + return port - 10000; + } else { + return port; + } + } catch (IOException e) { + } finally { + if (socket != null) { + try { + socket.close(); + } catch (IOException e) { + } + } + } + throw new IllegalStateException("Could not find a free TCP/IP port."); + } + + public static boolean isFreePort(int port) { + ServerSocket socket = null; + try { + socket = new ServerSocket(port); + socket.setReuseAddress(true); + return true; + } catch (IOException e) { + } finally { + if (socket != null) { + try { + socket.close(); + } catch (IOException e) { + } + } + } + return false; + } + + public static class FailedToStartRedisException extends RuntimeException { + + public FailedToStartRedisException() { + } + } + +} diff --git a/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/RedisVersion.java b/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/RedisVersion.java new file mode 100644 index 000000000..1b4db4746 --- /dev/null +++ b/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/RedisVersion.java @@ -0,0 +1,58 @@ +package org.redisson; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * + * @author Rui Gu (https://github.com/jackygurui) + */ +public class RedisVersion implements Comparable{ + + private final String fullVersion; + private final Integer majorVersion; + private final Integer minorVersion; + private final Integer patchVersion; + + public RedisVersion(String fullVersion) { + this.fullVersion = fullVersion; + Matcher matcher = Pattern.compile("^([\\d]+)\\.([\\d]+)\\.([\\d]+)$").matcher(fullVersion); + matcher.find(); + majorVersion = Integer.parseInt(matcher.group(1)); + minorVersion = Integer.parseInt(matcher.group(2)); + patchVersion = Integer.parseInt(matcher.group(3)); + } + + public String getFullVersion() { + return fullVersion; + } + + public int getMajorVersion() { + return majorVersion; + } + + public int getMinorVersion() { + return minorVersion; + } + + public int getPatchVersion() { + return patchVersion; + } + + @Override + public int compareTo(RedisVersion o) { + int ma = this.majorVersion.compareTo(o.majorVersion); + int mi = this.minorVersion.compareTo(o.minorVersion); + int pa = this.patchVersion.compareTo(o.patchVersion); + return ma != 0 ? ma : mi != 0 ? mi : pa; + } + + public int compareTo(String redisVersion) { + return this.compareTo(new RedisVersion(redisVersion)); + } + + public static int compareTo(String redisVersion1, String redisVersion2) { + return new RedisVersion(redisVersion1).compareTo(redisVersion2); + } + +} diff --git a/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/RedissonRuntimeEnvironment.java b/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/RedissonRuntimeEnvironment.java new file mode 100644 index 000000000..6d426c601 --- /dev/null +++ b/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/RedissonRuntimeEnvironment.java @@ -0,0 +1,21 @@ +package org.redisson; + +import java.util.Locale; + +/** + * + * @author Rui Gu (https://github.com/jackygurui) + */ +public class RedissonRuntimeEnvironment { + + public static final boolean isTravis = "true".equalsIgnoreCase(System.getProperty("travisEnv")); + public static final String redisBinaryPath = System.getProperty("redisBinary", "C:\\redis\\redis-server.exe"); + public static final String tempDir = System.getProperty("java.io.tmpdir"); + public static final String OS; + public static final boolean isWindows; + + static { + OS = System.getProperty("os.name", "generic"); + isWindows = OS.toLowerCase(Locale.ENGLISH).contains("win"); + } +} diff --git a/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/BaseConnectionTest.java b/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/BaseConnectionTest.java new file mode 100644 index 000000000..770f1d283 --- /dev/null +++ b/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/BaseConnectionTest.java @@ -0,0 +1,16 @@ +package org.redisson.spring.data.connection; + +import org.junit.Before; +import org.redisson.BaseTest; +import org.springframework.data.redis.connection.RedisConnection; + +public abstract class BaseConnectionTest extends BaseTest { + + RedisConnection connection; + + @Before + public void init() { + connection = new RedissonConnection(redisson); + } + +} diff --git a/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionRenameTest.java b/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionRenameTest.java new file mode 100644 index 000000000..c3233eb53 --- /dev/null +++ b/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionRenameTest.java @@ -0,0 +1,120 @@ +package org.redisson.spring.data.connection; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.redisson.BaseTest; +import org.springframework.dao.InvalidDataAccessResourceUsageException; + +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT; + +@RunWith(Parameterized.class) +public class RedissonClusterConnectionRenameTest extends BaseTest { + + @Parameterized.Parameters(name= "{index} - same slot = {0}") + public static Iterable data() { + return Arrays.asList(new Object[][] { + {false}, + {true} + }); + } + + @Parameterized.Parameter(0) + public boolean sameSlot; + + byte[] originalKey = "key".getBytes(); + byte[] newKey = "unset".getBytes(); + byte[] value = "value".getBytes(); + + @Test + public void testRename() { + testInCluster(connection -> { + connection.set(originalKey, value); + connection.expire(originalKey, 1000); + + Integer originalSlot = connection.clusterGetSlotForKey(originalKey); + newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot), connection); + + connection.rename(originalKey, newKey); + + assertThat(connection.get(newKey)).isEqualTo(value); + assertThat(connection.ttl(newKey)).isGreaterThan(0); + }); + } + + @Test + public void testRename_pipeline() { + testInCluster(connection -> { + connection.set(originalKey, value); + + Integer originalSlot = connection.clusterGetSlotForKey(originalKey); + newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot), connection); + + connection.openPipeline(); + assertThatThrownBy(() -> connection.rename(originalKey, newKey)).isInstanceOf(InvalidDataAccessResourceUsageException.class); + connection.closePipeline(); + }); + } + + protected byte[] getNewKeyForSlot(byte[] originalKey, Integer targetSlot, RedissonClusterConnection connection) { + int counter = 0; + + byte[] newKey = (new String(originalKey) + counter).getBytes(); + + Integer newKeySlot = connection.clusterGetSlotForKey(newKey); + + while(!newKeySlot.equals(targetSlot)) { + counter++; + newKey = (new String(originalKey) + counter).getBytes(); + newKeySlot = connection.clusterGetSlotForKey(newKey); + } + + return newKey; + } + + @Test + public void testRenameNX() { + testInCluster(connection -> { + connection.set(originalKey, value); + connection.expire(originalKey, 1000); + + Integer originalSlot = connection.clusterGetSlotForKey(originalKey); + newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot), connection); + + Boolean result = connection.renameNX(originalKey, newKey); + + assertThat(connection.get(newKey)).isEqualTo(value); + assertThat(connection.ttl(newKey)).isGreaterThan(0); + assertThat(result).isTrue(); + + connection.set(originalKey, value); + + result = connection.renameNX(originalKey, newKey); + + assertThat(result).isFalse(); + }); + } + + @Test + public void testRenameNX_pipeline() { + testInCluster(connection -> { + connection.set(originalKey, value); + + Integer originalSlot = connection.clusterGetSlotForKey(originalKey); + newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot), connection); + + connection.openPipeline(); + assertThatThrownBy(() -> connection.renameNX(originalKey, newKey)).isInstanceOf(InvalidDataAccessResourceUsageException.class); + connection.closePipeline(); + }); + } + + private Integer getTargetSlot(Integer originalSlot) { + return sameSlot ? originalSlot : MAX_SLOT - originalSlot - 1; + } + +} diff --git a/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java b/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java new file mode 100644 index 000000000..1777b8f5f --- /dev/null +++ b/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java @@ -0,0 +1,317 @@ +package org.redisson.spring.data.connection; + +import net.bytebuddy.utility.RandomString; +import org.junit.Test; +import org.redisson.BaseTest; +import org.redisson.api.RedissonClient; +import org.redisson.connection.MasterSlaveConnectionManager; +import org.springframework.data.redis.connection.ClusterInfo; +import org.springframework.data.redis.connection.RedisClusterNode; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.connection.RedisNode.NodeType; +import org.springframework.data.redis.core.Cursor; +import org.springframework.data.redis.core.ScanOptions; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.data.redis.core.types.RedisClientInfo; + +import java.nio.charset.StandardCharsets; +import java.util.*; + +import static org.assertj.core.api.Assertions.assertThat; + +public class RedissonClusterConnectionTest extends BaseTest { + + @Test + public void testRandomKey() { + testInCluster(connection -> { + RedissonClient redisson = (RedissonClient) connection.getNativeConnection(); + StringRedisTemplate redisTemplate = new StringRedisTemplate(); + redisTemplate.setConnectionFactory(new RedissonConnectionFactory(redisson)); + redisTemplate.afterPropertiesSet(); + + for (int i = 0; i < 10; i++) { + redisTemplate.opsForValue().set("i" + i, "i" + i); + } + + for (RedisClusterNode clusterNode : redisTemplate.getConnectionFactory().getClusterConnection().clusterGetNodes()) { + String key = redisTemplate.opsForCluster().randomKey(clusterNode); + assertThat(key).isNotNull(); + } + }); + } + + @Test + public void testDel() { + testInCluster(connection -> { + List keys = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + byte[] key = ("test" + i).getBytes(); + keys.add(key); + connection.set(key, ("test" + i).getBytes()); + } + assertThat(connection.del(keys.toArray(new byte[0][]))).isEqualTo(10); + }); + } + + @Test + public void testScan() { + testInCluster(connection -> { + Map map = new HashMap<>(); + for (int i = 0; i < 10000; i++) { + map.put(RandomString.make(32).getBytes(), RandomString.make(32).getBytes(StandardCharsets.UTF_8)); + } + connection.mSet(map); + + Cursor b = connection.scan(ScanOptions.scanOptions().build()); + Set sett = new HashSet<>(); + int counter = 0; + while (b.hasNext()) { + byte[] tt = b.next(); + sett.add(new String(tt)); + counter++; + } + assertThat(sett.size()).isEqualTo(map.size()); + assertThat(counter).isEqualTo(map.size()); + }); + } + + @Test + public void testMSet() { + testInCluster(connection -> { + Map map = new HashMap<>(); + for (int i = 0; i < 10; i++) { + map.put(("test" + i).getBytes(), ("test" + i*100).getBytes()); + } + connection.mSet(map); + for (Map.Entry entry : map.entrySet()) { + assertThat(connection.get(entry.getKey())).isEqualTo(entry.getValue()); + } + }); + } + + @Test + public void testMGet() { + testInCluster(connection -> { + Map map = new HashMap<>(); + for (int i = 0; i < 10; i++) { + map.put(("test" + i).getBytes(), ("test" + i*100).getBytes()); + } + connection.mSet(map); + List r = connection.mGet(map.keySet().toArray(new byte[0][])); + assertThat(r).containsExactly(map.values().toArray(new byte[0][])); + }); + } + + @Test + public void testClusterGetNodes() { + testInCluster(connection -> { + Iterable nodes = connection.clusterGetNodes(); + assertThat(nodes).hasSize(6); + for (RedisClusterNode redisClusterNode : nodes) { + assertThat(redisClusterNode.getLinkState()).isNotNull(); + assertThat(redisClusterNode.getFlags()).isNotEmpty(); + assertThat(redisClusterNode.getHost()).isNotNull(); + assertThat(redisClusterNode.getPort()).isNotNull(); + assertThat(redisClusterNode.getId()).isNotNull(); + assertThat(redisClusterNode.getType()).isNotNull(); + if (redisClusterNode.getType() == NodeType.MASTER) { + assertThat(redisClusterNode.getSlotRange().getSlots()).isNotEmpty(); + } else { + assertThat(redisClusterNode.getMasterId()).isNotNull(); + } + } + }); + } + + @Test + public void testClusterGetNodesMaster() { + testInCluster(connection -> { + Iterable nodes = connection.clusterGetNodes(); + for (RedisClusterNode redisClusterNode : nodes) { + if (redisClusterNode.getType() == NodeType.MASTER) { + Collection slaves = connection.clusterGetReplicas(redisClusterNode); + assertThat(slaves).hasSize(1); + } + } + }); + } + + @Test + public void testClusterGetMasterSlaveMap() { + testInCluster(connection -> { + Map> map = connection.clusterGetMasterReplicaMap(); + assertThat(map).hasSize(3); + for (Collection slaves : map.values()) { + assertThat(slaves).hasSize(1); + } + }); + } + + @Test + public void testClusterGetSlotForKey() { + testInCluster(connection -> { + Integer slot = connection.clusterGetSlotForKey("123".getBytes()); + assertThat(slot).isNotNull(); + }); + } + + @Test + public void testClusterGetNodeForSlot() { + testInCluster(connection -> { + RedisClusterNode node1 = connection.clusterGetNodeForSlot(1); + RedisClusterNode node2 = connection.clusterGetNodeForSlot(16000); + assertThat(node1.getId()).isNotEqualTo(node2.getId()); + }); + } + + @Test + public void testClusterGetNodeForKey() { + testInCluster(connection -> { + RedisClusterNode node = connection.clusterGetNodeForKey("123".getBytes()); + assertThat(node).isNotNull(); + }); + } + + @Test + public void testClusterGetClusterInfo() { + testInCluster(connection -> { + ClusterInfo info = connection.clusterGetClusterInfo(); + assertThat(info.getSlotsFail()).isEqualTo(0); + assertThat(info.getSlotsOk()).isEqualTo(MasterSlaveConnectionManager.MAX_SLOT); + assertThat(info.getSlotsAssigned()).isEqualTo(MasterSlaveConnectionManager.MAX_SLOT); + }); + } + + @Test + public void testClusterAddRemoveSlots() { + testInCluster(connection -> { + RedisClusterNode master = getFirstMaster(connection); + Integer slot = master.getSlotRange().getSlots().iterator().next(); + connection.clusterDeleteSlots(master, slot); + connection.clusterAddSlots(master, slot); + }); + } + + @Test + public void testClusterCountKeysInSlot() { + testInCluster(connection -> { + Long t = connection.clusterCountKeysInSlot(1); + assertThat(t).isZero(); + }); + } + + @Test + public void testClusterGetKeysInSlot() { + testInCluster(connection -> { + connection.flushAll(); + List keys = connection.clusterGetKeysInSlot(12, 10); + assertThat(keys).isEmpty(); + }); + } + + @Test + public void testClusterPing() { + testInCluster(connection -> { + RedisClusterNode master = getFirstMaster(connection); + String res = connection.ping(master); + assertThat(res).isEqualTo("PONG"); + }); + } + + @Test + public void testDbSize() { + testInCluster(connection -> { + connection.flushAll(); + RedisClusterNode master = getFirstMaster(connection); + Long size = connection.dbSize(master); + assertThat(size).isZero(); + }); + } + + @Test + public void testInfo() { + testInCluster(connection -> { + RedisClusterNode master = getFirstMaster(connection); + Properties info = connection.info(master); + assertThat(info.size()).isGreaterThan(10); + }); + } + + @Test + public void testDelPipeline() { + testInCluster(connection -> { + byte[] k = "key".getBytes(); + byte[] v = "val".getBytes(); + connection.set(k, v); + + connection.openPipeline(); + connection.get(k); + connection.del(k); + List results = connection.closePipeline(); + byte[] val = (byte[])results.get(0); + assertThat(val).isEqualTo(v); + Long res = (Long) results.get(1); + assertThat(res).isEqualTo(1); + }); + } + + @Test + public void testResetConfigStats() { + testInCluster(connection -> { + RedisClusterNode master = getFirstMaster(connection); + connection.resetConfigStats(master); + }); + } + + @Test + public void testTime() { + testInCluster(connection -> { + RedisClusterNode master = getFirstMaster(connection); + Long time = connection.time(master); + assertThat(time).isGreaterThan(1000); + }); + } + + @Test + public void testGetClientList() { + testInCluster(connection -> { + RedisClusterNode master = getFirstMaster(connection); + List list = connection.getClientList(master); + assertThat(list.size()).isGreaterThan(10); + }); + } + + @Test + public void testSetConfig() { + testInCluster(connection -> { + RedisClusterNode master = getFirstMaster(connection); + connection.setConfig(master, "timeout", "10"); + }); + } + + @Test + public void testGetConfig() { + testInCluster(connection -> { + RedisClusterNode master = getFirstMaster(connection); + Properties config = connection.getConfig(master, "*"); + assertThat(config.size()).isGreaterThan(20); + }); + } + + protected RedisClusterNode getFirstMaster(RedissonClusterConnection connection) { + Map> map = connection.clusterGetMasterReplicaMap(); + RedisClusterNode master = map.keySet().iterator().next(); + return master; + } + + @Test + public void testConnectionFactoryReturnsClusterConnection() { + testInCluster(connection -> { + RedissonClient redisson = (RedissonClient) connection.getNativeConnection(); + RedisConnectionFactory connectionFactory = new RedissonConnectionFactory(redisson); + + assertThat(connectionFactory.getConnection()).isInstanceOf(RedissonClusterConnection.class); + }); + } + +} diff --git a/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonConnectionTest.java b/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonConnectionTest.java new file mode 100644 index 000000000..d2432fae3 --- /dev/null +++ b/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonConnectionTest.java @@ -0,0 +1,287 @@ +package org.redisson.spring.data.connection; + +import org.junit.Test; +import org.springframework.data.domain.Range; +import org.springframework.data.geo.Circle; +import org.springframework.data.geo.GeoResults; +import org.springframework.data.geo.Point; +import org.springframework.data.redis.connection.Limit; +import org.springframework.data.redis.connection.ReactiveRedisConnection; +import org.springframework.data.redis.connection.RedisGeoCommands; +import org.springframework.data.redis.connection.RedisStringCommands.SetOption; +import org.springframework.data.redis.connection.RedisZSetCommands; +import org.springframework.data.redis.connection.zset.Tuple; +import org.springframework.data.redis.core.*; +import org.springframework.data.redis.core.types.Expiration; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.assertj.core.api.Assertions.assertThat; + +public class RedissonConnectionTest extends BaseConnectionTest { + + @Test + public void testZRandMemberScore() { + StringRedisTemplate redisTemplate = new StringRedisTemplate(); + redisTemplate.setConnectionFactory(new RedissonConnectionFactory(redisson)); + redisTemplate.afterPropertiesSet(); + + redisTemplate.boundZSetOps("test").add("1", 10); + redisTemplate.boundZSetOps("test").add("2", 20); + redisTemplate.boundZSetOps("test").add("3", 30); + + RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson); + ReactiveRedisConnection cc = factory.getReactiveConnection(); + + Tuple b = cc.zSetCommands().zRandMemberWithScore(ByteBuffer.wrap("test".getBytes())).block(); + assertThat(b.getScore()).isNotNaN(); + assertThat(new String(b.getValue())).isIn("1", "2", "3"); + } + + @Test + public void testBZPop() { + StringRedisTemplate redisTemplate = new StringRedisTemplate(); + redisTemplate.setConnectionFactory(new RedissonConnectionFactory(redisson)); + redisTemplate.afterPropertiesSet(); + + redisTemplate.boundZSetOps("test").add("1", 10); + redisTemplate.boundZSetOps("test").add("2", 20); + redisTemplate.boundZSetOps("test").add("3", 30); + + ZSetOperations.TypedTuple r = redisTemplate.boundZSetOps("test").popMin(Duration.ofSeconds(1)); + assertThat(r.getValue()).isEqualTo("1"); + assertThat(r.getScore()).isEqualTo(10); + + RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson); + ReactiveRedisConnection cc = factory.getReactiveConnection(); + + Tuple r2 = cc.zSetCommands().bZPopMin(ByteBuffer.wrap("test".getBytes()), Duration.ofSeconds(1)).block(); + assertThat(r2.getValue()).isEqualTo("2".getBytes()); + assertThat(r2.getScore()).isEqualTo(20); + } + + @Test + public void testZPop() { + StringRedisTemplate redisTemplate = new StringRedisTemplate(); + redisTemplate.setConnectionFactory(new RedissonConnectionFactory(redisson)); + redisTemplate.afterPropertiesSet(); + + redisTemplate.boundZSetOps("test").add("1", 10); + redisTemplate.boundZSetOps("test").add("2", 20); + redisTemplate.boundZSetOps("test").add("3", 30); + + ZSetOperations.TypedTuple r = redisTemplate.boundZSetOps("test").popMin(); + assertThat(r.getValue()).isEqualTo("1"); + assertThat(r.getScore()).isEqualTo(10); + + RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson); + ReactiveRedisConnection cc = factory.getReactiveConnection(); + + Tuple r2 = cc.zSetCommands().zPopMin(ByteBuffer.wrap("test".getBytes())).block(); + assertThat(r2.getValue()).isEqualTo("2".getBytes()); + assertThat(r2.getScore()).isEqualTo(20); + } + + @Test + public void testZRangeWithScores() { + StringRedisTemplate redisTemplate = new StringRedisTemplate(); + redisTemplate.setConnectionFactory(new RedissonConnectionFactory(redisson)); + redisTemplate.afterPropertiesSet(); + + redisTemplate.boundZSetOps("test").add("1", 10); + redisTemplate.boundZSetOps("test").add("2", 20); + redisTemplate.boundZSetOps("test").add("3", 30); + + Set> objs = redisTemplate.boundZSetOps("test").rangeWithScores(0, 100); + assertThat(objs).hasSize(3); + assertThat(objs).containsExactlyInAnyOrder(ZSetOperations.TypedTuple.of("1", 10D), + ZSetOperations.TypedTuple.of("2", 20D), + ZSetOperations.TypedTuple.of("3", 30D)); + } + + @Test + public void testZDiff() { + StringRedisTemplate redisTemplate = new StringRedisTemplate(); + redisTemplate.setConnectionFactory(new RedissonConnectionFactory(redisson)); + redisTemplate.afterPropertiesSet(); + + redisTemplate.boundZSetOps("test").add("1", 10); + redisTemplate.boundZSetOps("test").add("2", 20); + redisTemplate.boundZSetOps("test").add("3", 30); + redisTemplate.boundZSetOps("test").add("4", 30); + + redisTemplate.boundZSetOps("test2").add("5", 50); + redisTemplate.boundZSetOps("test2").add("2", 20); + redisTemplate.boundZSetOps("test2").add("3", 30); + redisTemplate.boundZSetOps("test2").add("6", 60); + + Set objs = redisTemplate.boundZSetOps("test").difference("test2"); + assertThat(objs).hasSize(2); + } + + @Test + public void testZLexCount() { + StringRedisTemplate redisTemplate = new StringRedisTemplate(); + redisTemplate.setConnectionFactory(new RedissonConnectionFactory(redisson)); + redisTemplate.afterPropertiesSet(); + + redisTemplate.boundZSetOps("test").add("1", 10); + redisTemplate.boundZSetOps("test").add("2", 20); + redisTemplate.boundZSetOps("test").add("3", 30); + + Long size = redisTemplate.boundZSetOps("test").lexCount(Range.closed("1", "2")); + assertThat(size).isEqualTo(2); + } + + @Test + public void testZRemLexByRange() { + StringRedisTemplate redisTemplate = new StringRedisTemplate(); + redisTemplate.setConnectionFactory(new RedissonConnectionFactory(redisson)); + redisTemplate.afterPropertiesSet(); + + redisTemplate.boundZSetOps("test").add("1", 10); + redisTemplate.boundZSetOps("test").add("2", 20); + redisTemplate.boundZSetOps("test").add("3", 30); + + Long size = redisTemplate.boundZSetOps("test") + .removeRangeByLex(Range.closed("1", "2")); + assertThat(size).isEqualTo(2); + } + + @Test + public void testReverseRangeByLex() { + StringRedisTemplate redisTemplate = new StringRedisTemplate(); + redisTemplate.setConnectionFactory(new RedissonConnectionFactory(redisson)); + redisTemplate.afterPropertiesSet(); + + redisTemplate.boundZSetOps("test").add("1", 10); + redisTemplate.boundZSetOps("test").add("2", 20); + + Set ops = redisTemplate.boundZSetOps("test") + .reverseRangeByLex(Range.closed("1", "2") + , Limit.limit().count(10)); + assertThat(ops.size()).isEqualTo(2); + } + + @Test + public void testExecute() { + Long s = (Long) connection.execute("ttl", "key".getBytes()); + assertThat(s).isEqualTo(-2); + connection.execute("flushDb"); + } + + @Test + public void testRandomMembers() { + RedisTemplate redisTemplate = new RedisTemplate<>(); + redisTemplate.setConnectionFactory(new RedissonConnectionFactory(redisson)); + redisTemplate.afterPropertiesSet(); + + + SetOperations ops = redisTemplate.opsForSet(); + ops.add("val", 1, 2, 3, 4); + Set values = redisTemplate.opsForSet().distinctRandomMembers("val", 1L); + assertThat(values).containsAnyOf(1, 2, 3, 4); + + Integer v = redisTemplate.opsForSet().randomMember("val"); + assertThat(v).isNotNull(); + } + + @Test + public void testRangeByLex() { + RedisTemplate redisTemplate = new RedisTemplate<>(); + redisTemplate.setConnectionFactory(new RedissonConnectionFactory(redisson)); + redisTemplate.afterPropertiesSet(); + + RedisZSetCommands.Range range = new RedisZSetCommands.Range(); + range.lt("c"); + Set zSetValue = redisTemplate.opsForZSet().rangeByLex("val", range); + assertThat(zSetValue).isEmpty(); + } + + @Test + public void testGeo() { + RedisTemplate redisTemplate = new RedisTemplate<>(); + redisTemplate.setConnectionFactory(new RedissonConnectionFactory(redisson)); + redisTemplate.afterPropertiesSet(); + + String key = "test_geo_key"; + Point point = new Point(116.401001, 40.119499); + redisTemplate.opsForGeo().add(key, point, "a"); + + point = new Point(111.545998, 36.133499); + redisTemplate.opsForGeo().add(key, point, "b"); + + point = new Point(111.483002, 36.030998); + redisTemplate.opsForGeo().add(key, point, "c"); + Circle within = new Circle(116.401001, 40.119499, 80000); + RedisGeoCommands.GeoRadiusCommandArgs args = RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs().includeCoordinates(); + GeoResults> res = redisTemplate.opsForGeo().radius(key, within, args); + assertThat(res.getContent().get(0).getContent().getName()).isEqualTo("a"); + } + + @Test + public void testZSet() { + connection.zAdd(new byte[] {1}, -1, new byte[] {1}); + connection.zAdd(new byte[] {1}, 2, new byte[] {2}); + connection.zAdd(new byte[] {1}, 10, new byte[] {3}); + + assertThat(connection.zRangeByScore(new byte[] {1}, Double.NEGATIVE_INFINITY, 5)) + .containsOnly(new byte[] {1}, new byte[] {2}); + } + + @Test + public void testEcho() { + assertThat(connection.echo("test".getBytes())).isEqualTo("test".getBytes()); + } + + @Test + public void testSetGet() { + connection.set("key".getBytes(), "value".getBytes()); + assertThat(connection.get("key".getBytes())).isEqualTo("value".getBytes()); + } + + @Test + public void testSetExpiration() { + assertThat(connection.set("key".getBytes(), "value".getBytes(), Expiration.milliseconds(111122), SetOption.SET_IF_ABSENT)).isTrue(); + assertThat(connection.get("key".getBytes())).isEqualTo("value".getBytes()); + } + + @Test + public void testHSetGet() { + assertThat(connection.hSet("key".getBytes(), "field".getBytes(), "value".getBytes())).isTrue(); + assertThat(connection.hGet("key".getBytes(), "field".getBytes())).isEqualTo("value".getBytes()); + } + + @Test + public void testZScan() { + connection.zAdd("key".getBytes(), 1, "value1".getBytes()); + connection.zAdd("key".getBytes(), 2, "value2".getBytes()); + + Cursor t = connection.zScan("key".getBytes(), ScanOptions.scanOptions().build()); + assertThat(t.hasNext()).isTrue(); + assertThat(t.next().getValue()).isEqualTo("value1".getBytes()); + assertThat(t.hasNext()).isTrue(); + assertThat(t.next().getValue()).isEqualTo("value2".getBytes()); + } + + @Test + public void testRandFieldWithValues() { + connection.hSet("map".getBytes(), "key1".getBytes(), "value1".getBytes()); + connection.hSet("map".getBytes(), "key2".getBytes(), "value2".getBytes()); + connection.hSet("map".getBytes(), "key3".getBytes(), "value3".getBytes()); + + List> s = connection.hRandFieldWithValues("map".getBytes(), 2); + assertThat(s).hasSize(2); + + Map.Entry s2 = connection.hRandFieldWithValues("map".getBytes()); + assertThat(s2).isNotNull(); + + byte[] f = connection.hRandField("map".getBytes()); + assertThat((Object) f).isIn("key1".getBytes(), "key2".getBytes(), "key3".getBytes()); + } + +} diff --git a/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonMultiConnectionTest.java b/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonMultiConnectionTest.java new file mode 100644 index 000000000..91c83f751 --- /dev/null +++ b/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonMultiConnectionTest.java @@ -0,0 +1,96 @@ +package org.redisson.spring.data.connection; + +import static org.assertj.core.api.Assertions.*; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Test; +import org.redisson.BaseTest; +import org.springframework.dao.DataAccessException; +import org.springframework.data.redis.core.RedisOperations; +import org.springframework.data.redis.core.SessionCallback; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.data.redis.core.ValueOperations; + +public class RedissonMultiConnectionTest extends BaseConnectionTest { + + @Test + public void testBroken() throws InterruptedException { + StringRedisTemplate stringRedisTemplate = new StringRedisTemplate(new RedissonConnectionFactory(redisson)); + ExecutorService e = Executors.newFixedThreadPool(32); + AtomicBoolean hasErrors = new AtomicBoolean(); + for (int i = 0; i < 10; i++) { + e.submit(() -> { + stringRedisTemplate.execute(new SessionCallback() { + @Override + public Void execute(RedisOperations operations) throws DataAccessException { + try { + ValueOperations valueOps = operations.opsForValue(); + operations.multi(); + valueOps.set("test3", "value"); + } catch (Exception e) { + e.printStackTrace(); + hasErrors.set(true); + } + return null; + } + }); + stringRedisTemplate.execute(new SessionCallback() { + @Override + public Void execute(RedisOperations operations) throws DataAccessException { + try { + ValueOperations valueOps = operations.opsForValue(); + valueOps.set("test1", "value"); + assertThat(valueOps.get("test1")).isEqualTo("value"); + } catch (Exception e) { + e.printStackTrace(); + hasErrors.set(true); + } + return null; + } + }); + }); + } + e.shutdown(); + e.awaitTermination(1, TimeUnit.MINUTES); + assertThat(hasErrors).isFalse(); + } + + @Test + public void testEcho() { + RedissonConnection connection = new RedissonConnection(redisson); + connection.multi(); + assertThat(connection.echo("test".getBytes())).isNull(); + assertThat(connection.exec().iterator().next()).isEqualTo("test".getBytes()); + } + + @Test + public void testSetGet() { + RedissonConnection connection = new RedissonConnection(redisson); + connection.multi(); + assertThat(connection.isQueueing()).isTrue(); + connection.set("key".getBytes(), "value".getBytes()); + assertThat(connection.get("key".getBytes())).isNull(); + + List result = connection.exec(); + assertThat(connection.isQueueing()).isFalse(); + assertThat(result.get(0)).isEqualTo("value".getBytes()); + } + + @Test + public void testHSetGet() { + RedissonConnection connection = new RedissonConnection(redisson); + connection.multi(); + assertThat(connection.hSet("key".getBytes(), "field".getBytes(), "value".getBytes())).isNull(); + assertThat(connection.hGet("key".getBytes(), "field".getBytes())).isNull(); + + List result = connection.exec(); + assertThat((Boolean)result.get(0)).isTrue(); + assertThat(result.get(1)).isEqualTo("value".getBytes()); + } + +} diff --git a/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonPipelineConnectionTest.java b/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonPipelineConnectionTest.java new file mode 100644 index 000000000..643e7423a --- /dev/null +++ b/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonPipelineConnectionTest.java @@ -0,0 +1,64 @@ +package org.redisson.spring.data.connection; + +import static org.assertj.core.api.Assertions.*; + +import java.util.List; + +import org.junit.Test; +import org.redisson.BaseTest; + +public class RedissonPipelineConnectionTest extends BaseConnectionTest { + + @Test + public void testDel() { + RedissonConnection connection = new RedissonConnection(redisson); + byte[] key = "my_key".getBytes(); + byte[] value = "my_value".getBytes(); + connection.set(key, value); + + connection.openPipeline(); + connection.get(key); + connection.del(key); + + List results = connection.closePipeline(); + byte[] val = (byte[])results.get(0); + assertThat(val).isEqualTo(value); + Long res = (Long) results.get(1); + assertThat(res).isEqualTo(1); + } + + @Test + public void testEcho() { + RedissonConnection connection = new RedissonConnection(redisson); + connection.openPipeline(); + assertThat(connection.echo("test".getBytes())).isNull(); + assertThat(connection.closePipeline().iterator().next()).isEqualTo("test".getBytes()); + } + + @Test + public void testSetGet() { + RedissonConnection connection = new RedissonConnection(redisson); + connection.openPipeline(); + assertThat(connection.isPipelined()).isTrue(); + connection.set("key".getBytes(), "value".getBytes()); + assertThat(connection.get("key".getBytes())).isNull(); + + List result = connection.closePipeline(); + assertThat(connection.isPipelined()).isFalse(); + assertThat(result.get(0)).isEqualTo("value".getBytes()); + } + + @Test + public void testHSetGet() { + RedissonConnection connection = new RedissonConnection(redisson); + connection.openPipeline(); + assertThat(connection.hSet("key".getBytes(), "field".getBytes(), "value".getBytes())).isNull(); + assertThat(connection.hGet("key".getBytes(), "field".getBytes())).isNull(); + + List result = connection.closePipeline(); + assertThat((Boolean)result.get(0)).isTrue(); + assertThat(result.get(1)).isEqualTo("value".getBytes()); + } + + +} diff --git a/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonReactiveClusterKeyCommandsTest.java b/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonReactiveClusterKeyCommandsTest.java new file mode 100644 index 000000000..21d2a5e19 --- /dev/null +++ b/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonReactiveClusterKeyCommandsTest.java @@ -0,0 +1,166 @@ +package org.redisson.spring.data.connection; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.redisson.*; +import org.redisson.ClusterRunner.ClusterProcesses; +import org.redisson.RedisRunner.FailedToStartRedisException; +import org.redisson.api.RedissonClient; +import org.redisson.client.codec.StringCodec; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.config.Config; +import org.redisson.config.SubscriptionMode; +import org.redisson.connection.balancer.RandomLoadBalancer; +import org.redisson.reactive.CommandReactiveService; +import org.springframework.data.redis.RedisSystemException; +import org.springframework.data.redis.connection.ReactiveRedisClusterConnection; + + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Arrays; +import java.util.function.Consumer; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT; + +@RunWith(Parameterized.class) +public class RedissonReactiveClusterKeyCommandsTest extends BaseTest { + + @Parameterized.Parameters(name= "{index} - same slot = {0}; has ttl = {1}") + public static Iterable data() { + return Arrays.asList(new Object[][] { + {false, false}, + {true, false}, + {false, true}, + {true, true} + }); + } + + @Parameterized.Parameter(0) + public boolean sameSlot; + + @Parameterized.Parameter(1) + public boolean hasTtl; + + ByteBuffer originalKey = ByteBuffer.wrap("key".getBytes()); + ByteBuffer newKey = ByteBuffer.wrap("unset".getBytes()); + ByteBuffer value = ByteBuffer.wrap("value".getBytes()); + + private void testInClusterReactive(Consumer redissonCallback) { + testInCluster(c -> { + RedissonClient redisson = (RedissonClient) c.getNativeConnection(); + RedissonReactiveRedisClusterConnection connection = new RedissonReactiveRedisClusterConnection(((RedissonReactive) redisson.reactive()).getCommandExecutor()); + redissonCallback.accept(connection); + }); + } + + @Test + public void testRename() { + testInClusterReactive(connection -> { + connection.stringCommands().set(originalKey, value).block(); + + if (hasTtl) { + connection.keyCommands().expire(originalKey, Duration.ofSeconds(1000)).block(); + } + + Integer originalSlot = getSlotForKey(originalKey, (RedissonReactiveRedisClusterConnection) connection); + newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot), connection); + + Boolean response = connection.keyCommands().rename(originalKey, newKey).block(); + + assertThat(response).isTrue(); + + final ByteBuffer newKeyValue = connection.stringCommands().get(newKey).block(); + assertThat(newKeyValue).isEqualTo(value); + if (hasTtl) { + assertThat(connection.keyCommands().ttl(newKey).block()).isGreaterThan(0); + } else { + assertThat(connection.keyCommands().ttl(newKey).block()).isEqualTo(-1); + } + }); + } + + @Test + public void testRename_keyNotExist() { + testInClusterReactive(connection -> { + Integer originalSlot = getSlotForKey(originalKey, (RedissonReactiveRedisClusterConnection) connection); + newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot), connection); + + if (sameSlot) { + // This is a quirk of the implementation - since same-slot renames use the non-cluster version, + // the result is a Redis error. This behavior matches other spring-data-redis implementations + assertThatThrownBy(() -> connection.keyCommands().rename(originalKey, newKey).block()) + .isInstanceOf(RedisSystemException.class); + + } else { + Boolean response = connection.keyCommands().rename(originalKey, newKey).block(); + + assertThat(response).isTrue(); + + final ByteBuffer newKeyValue = connection.stringCommands().get(newKey).block(); + assertThat(newKeyValue).isEqualTo(null); + } + }); + } + + protected ByteBuffer getNewKeyForSlot(String originalKey, Integer targetSlot, ReactiveRedisClusterConnection connection) { + int counter = 0; + + ByteBuffer newKey = ByteBuffer.wrap((originalKey + counter).getBytes()); + + Integer newKeySlot = getSlotForKey(newKey, (RedissonReactiveRedisClusterConnection) connection); + + while(!newKeySlot.equals(targetSlot)) { + counter++; + newKey = ByteBuffer.wrap((originalKey + counter).getBytes()); + newKeySlot = getSlotForKey(newKey, (RedissonReactiveRedisClusterConnection) connection); + } + + return newKey; + } + + @Test + public void testRenameNX() { + testInClusterReactive(connection -> { + connection.stringCommands().set(originalKey, value).block(); + if (hasTtl) { + connection.keyCommands().expire(originalKey, Duration.ofSeconds(1000)).block(); + } + + Integer originalSlot = getSlotForKey(originalKey, (RedissonReactiveRedisClusterConnection) connection); + newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot), connection); + + Boolean result = connection.keyCommands().renameNX(originalKey, newKey).block(); + + assertThat(result).isTrue(); + assertThat(connection.stringCommands().get(newKey).block()).isEqualTo(value); + if (hasTtl) { + assertThat(connection.keyCommands().ttl(newKey).block()).isGreaterThan(0); + } else { + assertThat(connection.keyCommands().ttl(newKey).block()).isEqualTo(-1); + } + + connection.stringCommands().set(originalKey, value).block(); + + result = connection.keyCommands().renameNX(originalKey, newKey).block(); + + assertThat(result).isFalse(); + }); + } + + private Integer getTargetSlot(Integer originalSlot) { + return sameSlot ? originalSlot : MAX_SLOT - originalSlot - 1; + } + + private Integer getSlotForKey(ByteBuffer key, RedissonReactiveRedisClusterConnection connection) { + return (Integer) connection.read(null, StringCodec.INSTANCE, RedisCommands.KEYSLOT, key.array()).block(); + } + +} diff --git a/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommandsTest.java b/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommandsTest.java new file mode 100644 index 000000000..df7e9aef2 --- /dev/null +++ b/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommandsTest.java @@ -0,0 +1,21 @@ +package org.redisson.spring.data.connection; + +import org.junit.Test; +import org.springframework.data.redis.core.ReactiveStringRedisTemplate; + +import java.time.Duration; + +import static org.assertj.core.api.Assertions.assertThat; + +public class RedissonReactiveKeyCommandsTest extends BaseConnectionTest { + + @Test + public void testExpiration() { + RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson); + ReactiveStringRedisTemplate t = new ReactiveStringRedisTemplate(factory); + t.opsForValue().set("123", "4343").block(); + t.expire("123", Duration.ofMillis(1001)).block(); + assertThat(t.getExpire("123").block().toMillis()).isBetween(900L, 1000L); + } + +} diff --git a/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonScriptReactiveTest.java b/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonScriptReactiveTest.java new file mode 100644 index 000000000..2f68fbb93 --- /dev/null +++ b/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonScriptReactiveTest.java @@ -0,0 +1,33 @@ +package org.redisson.spring.data.connection; + +import org.junit.Test; +import org.springframework.data.redis.connection.ReactiveRedisConnection; +import org.springframework.data.redis.connection.ReturnType; +import reactor.core.publisher.Flux; + +import java.nio.ByteBuffer; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +public class RedissonScriptReactiveTest extends BaseConnectionTest { + + @Test + public void testEval() { + RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson); + ReactiveRedisConnection cc = factory.getReactiveConnection(); + + String s = "local ret = {}" + + "local mysqlKeys = {}" + + "table.insert(ret, 'test1')" + + "table.insert(ret, 'test2')" + + "table.insert(ret, 'test3')" + + "table.insert(ret, mysqlKeys)" + + "return ret"; + Flux> ss = cc.scriptingCommands().eval(ByteBuffer.wrap(s.getBytes()), ReturnType.MULTI, 0); + List r = ss.blockFirst(); + assertThat(r.get(2)).isEqualTo(ByteBuffer.wrap("test3".getBytes())); + assertThat((List) r.get(3)).isEmpty(); + } + +} diff --git a/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonSentinelConnectionTest.java b/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonSentinelConnectionTest.java new file mode 100644 index 000000000..b728c865b --- /dev/null +++ b/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonSentinelConnectionTest.java @@ -0,0 +1,132 @@ +package org.redisson.spring.data.connection; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.util.Collection; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.redisson.RedisRunner; +import org.redisson.RedisRunner.FailedToStartRedisException; +import org.redisson.Redisson; +import org.redisson.api.RedissonClient; +import org.redisson.config.Config; +import org.redisson.connection.balancer.RandomLoadBalancer; +import org.springframework.data.redis.connection.RedisSentinelConnection; +import org.springframework.data.redis.connection.RedisServer; + +public class RedissonSentinelConnectionTest { + + RedissonClient redisson; + RedisSentinelConnection connection; + RedisRunner.RedisProcess master; + RedisRunner.RedisProcess slave1; + RedisRunner.RedisProcess slave2; + RedisRunner.RedisProcess sentinel1; + RedisRunner.RedisProcess sentinel2; + RedisRunner.RedisProcess sentinel3; + + @Before + public void before() throws FailedToStartRedisException, IOException, InterruptedException { + master = new RedisRunner() + .nosave() + .randomDir() + .run(); + slave1 = new RedisRunner() + .port(6380) + .nosave() + .randomDir() + .slaveof("127.0.0.1", 6379) + .run(); + slave2 = new RedisRunner() + .port(6381) + .nosave() + .randomDir() + .slaveof("127.0.0.1", 6379) + .run(); + sentinel1 = new RedisRunner() + .nosave() + .randomDir() + .port(26379) + .sentinel() + .sentinelMonitor("myMaster", "127.0.0.1", 6379, 2) + .run(); + sentinel2 = new RedisRunner() + .nosave() + .randomDir() + .port(26380) + .sentinel() + .sentinelMonitor("myMaster", "127.0.0.1", 6379, 2) + .run(); + sentinel3 = new RedisRunner() + .nosave() + .randomDir() + .port(26381) + .sentinel() + .sentinelMonitor("myMaster", "127.0.0.1", 6379, 2) + .run(); + + Thread.sleep(5000); + + Config config = new Config(); + config.useSentinelServers() + .setLoadBalancer(new RandomLoadBalancer()) + .addSentinelAddress(sentinel3.getRedisServerAddressAndPort()).setMasterName("myMaster"); + redisson = Redisson.create(config); + + RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson); + connection = factory.getSentinelConnection(); + } + + @After + public void after() { + sentinel1.stop(); + sentinel2.stop(); + sentinel3.stop(); + master.stop(); + slave1.stop(); + slave2.stop(); + + redisson.shutdown(); + } + + @Test + public void testMasters() { + Collection masters = connection.masters(); + assertThat(masters).hasSize(1); + } + + @Test + public void testSlaves() { + Collection masters = connection.masters(); + Collection slaves = connection.replicas(masters.iterator().next()); + assertThat(slaves).hasSize(2); + } + + @Test + public void testRemove() { + Collection masters = connection.masters(); + connection.remove(masters.iterator().next()); + } + + @Test + public void testMonitor() { + Collection masters = connection.masters(); + RedisServer master = masters.iterator().next(); + master.setName(master.getName() + ":"); + connection.monitor(master); + } + + @Test + public void testFailover() throws InterruptedException { + Collection masters = connection.masters(); + connection.failover(masters.iterator().next()); + + Thread.sleep(10000); + + RedisServer newMaster = connection.masters().iterator().next(); + assertThat(masters.iterator().next().getPort()).isNotEqualTo(newMaster.getPort()); + } +} diff --git a/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonStreamTest.java b/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonStreamTest.java new file mode 100644 index 000000000..1b7ebaa0a --- /dev/null +++ b/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonStreamTest.java @@ -0,0 +1,191 @@ +package org.redisson.spring.data.connection; + +import mockit.Invocation; +import mockit.Mock; +import mockit.MockUp; +import org.assertj.core.api.Assertions; +import org.junit.Test; +import org.redisson.Redisson; +import org.redisson.api.RedissonClient; +import org.redisson.client.protocol.CommandData; +import org.redisson.connection.ClientConnectionsEntry; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.connection.stream.*; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.data.redis.stream.StreamListener; +import org.springframework.data.redis.stream.StreamMessageListenerContainer; +import org.springframework.data.redis.stream.Subscription; + +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Nikita Koksharov + */ +public class RedissonStreamTest extends BaseConnectionTest { + + @Test + public void testReattachment() throws InterruptedException { + withSentinel((nodes, config) -> { + RedissonClient redissonClient = Redisson.create(config); + + RedisConnectionFactory redisConnectionFactory = new RedissonConnectionFactory(redissonClient); + + StreamMessageListenerContainer listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, getOptions()); + + + Consumer consumer = Consumer.from("group", "consumer1"); + StreamOffset streamOffset = StreamOffset.create("test", ReadOffset.from(">")); + String channel = "test"; + AtomicInteger counter = new AtomicInteger(); + Subscription subscription = listenerContainer.register(getReadRequest(consumer, streamOffset), + listener(redisConnectionFactory, channel, consumer, counter)); + + StringRedisTemplate t1 = new StringRedisTemplate(redisConnectionFactory); + t1.opsForStream().createGroup("test", "group"); + + listenerContainer.start(); + + AtomicReference invoked = new AtomicReference<>(); + + new MockUp() { + + @Mock + void reattachBlockingQueue(Invocation inv, CommandData commandData) { + try { + inv.proceed(commandData); + invoked.compareAndSet(null, true); + } catch (Exception e) { + e.printStackTrace(); + invoked.set(false); + throw e; + } + } + }; + + for (int i = 0; i < 3; i++) { + StringRedisTemplate stringRedisTemplate = new StringRedisTemplate(redisConnectionFactory); + ObjectRecord record = StreamRecords.newRecord() + .ofObject("message") + .withStreamKey(channel); + RecordId recordId = stringRedisTemplate.opsForStream().add(record); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + nodes.get(0).stop(); + try { + Thread.sleep(15000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + assertThat(invoked.get()).isTrue(); + + Assertions.assertThat(counter.get()).isEqualTo(3); + listenerContainer.stop(); + redissonClient.shutdown(); + }, 2); + } + + private StreamMessageListenerContainer.StreamMessageListenerContainerOptions> getOptions() { + return StreamMessageListenerContainer + .StreamMessageListenerContainerOptions + .builder() + .pollTimeout(Duration.ofSeconds(1)) + .targetType(String.class) + .build(); + } + + private StreamMessageListenerContainer.StreamReadRequest getReadRequest(Consumer consumer, StreamOffset streamOffset) { + return StreamMessageListenerContainer.StreamReadRequest + .builder(streamOffset) + .consumer(consumer) + .autoAcknowledge(false) + .cancelOnError((err) -> false) // do not stop consuming after error + .build(); + } + + private StreamListener listener(RedisConnectionFactory redisConnectionFactory, String channel, Consumer consumer, AtomicInteger counter) { + + return message -> { + try { + System.out.println("Acknowledging message: " + message.getId()); + StringRedisTemplate stringRedisTemplate = new StringRedisTemplate(redisConnectionFactory); + stringRedisTemplate.opsForStream().acknowledge(channel, consumer.getGroup(), message.getId()); + System.out.println("RECEIVED " + consumer + " " + message); + counter.incrementAndGet(); + } catch(Exception e) { + e.printStackTrace(); + } + }; + } + + @Test + public void testPending() { + connection.streamCommands().xGroupCreate("test".getBytes(), "testGroup", ReadOffset.latest(), true); + + PendingMessages p = connection.streamCommands().xPending("test".getBytes(), Consumer.from("testGroup", "test1")); + assertThat(p.size()).isEqualTo(0); + + connection.streamCommands().xAdd("test".getBytes(), Collections.singletonMap("1".getBytes(), "1".getBytes())); + connection.streamCommands().xAdd("test".getBytes(), Collections.singletonMap("2".getBytes(), "2".getBytes())); + connection.streamCommands().xAdd("test".getBytes(), Collections.singletonMap("3".getBytes(), "3".getBytes())); + + List l = connection.streamCommands().xReadGroup(Consumer.from("testGroup", "test1"), StreamOffset.create("test".getBytes(), ReadOffset.from(">"))); + assertThat(l.size()).isEqualTo(3); + + PendingMessages p2 = connection.streamCommands().xPending("test".getBytes(), Consumer.from("testGroup", "test1")); + assertThat(p2.size()).isEqualTo(3); + } + + @Test + public void testGroups() { + connection.streamCommands().xGroupCreate("test".getBytes(), "testGroup", ReadOffset.latest(), true); + connection.streamCommands().xAdd("test".getBytes(), Collections.singletonMap("1".getBytes(), "1".getBytes())); + connection.streamCommands().xAdd("test".getBytes(), Collections.singletonMap("2".getBytes(), "2".getBytes())); + connection.streamCommands().xAdd("test".getBytes(), Collections.singletonMap("3".getBytes(), "3".getBytes())); + + StreamInfo.XInfoGroups groups = connection.streamCommands().xInfoGroups("test".getBytes()); + assertThat(groups.size()).isEqualTo(1); + assertThat(groups.get(0).groupName()).isEqualTo("testGroup"); + assertThat(groups.get(0).pendingCount()).isEqualTo(0); + assertThat(groups.get(0).consumerCount()).isEqualTo(0); + assertThat(groups.get(0).lastDeliveredId()).isEqualTo("0-0"); + } + + @Test + public void testConsumers() { + connection.streamCommands().xGroupCreate("test".getBytes(), "testGroup", ReadOffset.latest(), true); + connection.streamCommands().xAdd("test".getBytes(), Collections.singletonMap("1".getBytes(), "1".getBytes())); + connection.streamCommands().xAdd("test".getBytes(), Collections.singletonMap("2".getBytes(), "2".getBytes())); + connection.streamCommands().xAdd("test".getBytes(), Collections.singletonMap("3".getBytes(), "3".getBytes())); + + connection.streamCommands().xGroupCreate("test".getBytes(), "testGroup2", ReadOffset.latest(), true); + connection.streamCommands().xAdd("test".getBytes(), Collections.singletonMap("1".getBytes(), "1".getBytes())); + connection.streamCommands().xAdd("test".getBytes(), Collections.singletonMap("2".getBytes(), "2".getBytes())); + connection.streamCommands().xAdd("test".getBytes(), Collections.singletonMap("3".getBytes(), "3".getBytes())); + + List list = connection.streamCommands().xReadGroup(Consumer.from("testGroup", "consumer1"), + StreamOffset.create("test".getBytes(), ReadOffset.lastConsumed())); + assertThat(list.size()).isEqualTo(6); + + StreamInfo.XInfoStream info = connection.streamCommands().xInfo("test".getBytes()); + assertThat(info.streamLength()).isEqualTo(6); + + StreamInfo.XInfoConsumers s1 = connection.streamCommands().xInfoConsumers("test".getBytes(), "testGroup"); + assertThat(s1.getConsumerCount()).isEqualTo(1); + assertThat(s1.get(0).consumerName()).isEqualTo("consumer1"); + assertThat(s1.get(0).pendingCount()).isEqualTo(6); + assertThat(s1.get(0).idleTimeMs()).isLessThan(100L); + assertThat(s1.get(0).groupName()).isEqualTo("testGroup"); + } + +} diff --git a/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeReactiveTest.java b/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeReactiveTest.java new file mode 100644 index 000000000..fa6dd571a --- /dev/null +++ b/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeReactiveTest.java @@ -0,0 +1,131 @@ +package org.redisson.spring.data.connection; + +import org.awaitility.Awaitility; +import org.awaitility.Durations; +import org.junit.Test; +import org.springframework.data.redis.connection.BitFieldSubCommands; +import org.springframework.data.redis.connection.ReactiveRedisConnection; +import org.springframework.data.redis.connection.ReactiveSubscription; +import org.springframework.data.redis.core.ReactiveStringRedisTemplate; +import org.springframework.data.redis.listener.ChannelTopic; +import reactor.core.publisher.Mono; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import static org.assertj.core.api.Assertions.assertThat; + +public class RedissonSubscribeReactiveTest extends BaseConnectionTest { + + @Test + public void testPubSub() { + RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson); + AtomicLong counter = new AtomicLong(); + + ReactiveStringRedisTemplate template = new ReactiveStringRedisTemplate(factory); + template.listenTo(ChannelTopic.of("test")).flatMap(message -> { + counter.incrementAndGet(); + return Mono.empty(); + }).subscribe(); + + for (int i = 0; i < 40; i++) { + ReactiveRedisConnection connection = factory.getReactiveConnection(); + connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block(); + } + + Awaitility.await().atMost(Durations.ONE_SECOND).untilAsserted(() -> { + assertThat(counter.get()).isEqualTo(40); + }); + } + + @Test + public void testTemplate() { + RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson); + AtomicLong counter = new AtomicLong(); + + ReactiveStringRedisTemplate template = new ReactiveStringRedisTemplate(factory); + template.listenTo(ChannelTopic.of("test")).flatMap(message -> { + counter.incrementAndGet(); + return Mono.empty(); + }).subscribe(); + + template.listenTo(ChannelTopic.of("test2")).flatMap(message -> { + counter.incrementAndGet(); + return Mono.empty(); + }).subscribe(); + + ReactiveRedisConnection connection = factory.getReactiveConnection(); + connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block(); + + Awaitility.await().atMost(Durations.ONE_SECOND) + .until(() -> counter.get() == 1); + + BitFieldSubCommands commands = BitFieldSubCommands.create() + .get(BitFieldSubCommands.BitFieldType.UINT_32).valueAt(0) + .get(BitFieldSubCommands.BitFieldType.UINT_32).valueAt(1) + .get(BitFieldSubCommands.BitFieldType.UINT_32).valueAt(2); + for (int i = 0; i < 128; i++) { + template.opsForValue().setBit("key", i, true); + } + + AtomicReference> result = new AtomicReference<>(); + template.opsForValue().bitField("key", commands).doOnNext(r -> { + result.set(r); + }).subscribe(); + + Awaitility.waitAtMost(Durations.FIVE_HUNDRED_MILLISECONDS).untilAsserted(() -> { + assertThat(result.get()).isEqualTo(Arrays.asList(0L, 0L, 0L)); + }); + } + + @Test + public void testSubscribe() { + RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson); + ReactiveRedisConnection connection = factory.getReactiveConnection(); + Mono s = connection.pubSubCommands().createSubscription(); + AtomicReference msg = new AtomicReference(); + ReactiveSubscription ss = s.block(); + + ss.subscribe(ByteBuffer.wrap("test".getBytes())).block(); + ss.receive().doOnEach(message -> { + msg.set(message.get().getMessage().array()); + }).subscribe(); + + connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block(); + + Awaitility.await().atMost(Durations.ONE_SECOND) + .until(() -> Arrays.equals("msg".getBytes(), msg.get())); + + ss.unsubscribe(); + + connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block(); + } + + @Test + public void testUnSubscribe() { + RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson); + ReactiveRedisConnection connection = factory.getReactiveConnection(); + Mono s = connection.pubSubCommands().createSubscription(); + AtomicReference msg = new AtomicReference(); + ReactiveSubscription ss = s.block(); + + ss.subscribe(ByteBuffer.wrap("test".getBytes())).block(); + ss.receive().doOnEach(message -> { + msg.set(message.get().getMessage().array()); + }).subscribe(); + + connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block(); + Awaitility.await().atMost(Durations.ONE_SECOND) + .until(() -> Arrays.equals("msg".getBytes(), msg.get())); + + ss.unsubscribe(); + + connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block(); + + + } + +} diff --git a/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeTest.java b/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeTest.java new file mode 100644 index 000000000..e83987428 --- /dev/null +++ b/redisson-spring-data/redisson-spring-data-33/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeTest.java @@ -0,0 +1,290 @@ +package org.redisson.spring.data.connection; + +import org.awaitility.Awaitility; +import org.awaitility.Durations; +import org.junit.Test; +import org.redisson.ClusterRunner; +import org.redisson.RedisRunner; +import org.redisson.Redisson; +import org.redisson.api.RedissonClient; +import org.redisson.config.Config; +import org.redisson.connection.balancer.RandomLoadBalancer; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.data.redis.connection.RedisConnection; +import org.springframework.data.redis.listener.ChannelTopic; +import org.springframework.data.redis.listener.PatternTopic; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.assertj.core.api.Assertions.assertThat; + +public class RedissonSubscribeTest extends BaseConnectionTest { + + @Test + public void testContainer() { + RedissonConnectionFactory f = new RedissonConnectionFactory(redisson); + RedisMessageListenerContainer container = new RedisMessageListenerContainer(); + container.setConnectionFactory(f); + container.afterPropertiesSet(); + container.start(); + +// for (int i = 0; i < 2; i++) { +// container.addMessageListener(new MessageListener() { +// @Override +// public void onMessage(Message message, byte[] pattern) { +// } +// }, ChannelTopic.of("test")); +// } +// +// container.stop(); +// +// container = new RedisMessageListenerContainer(); +// container.setConnectionFactory(f); +// container.afterPropertiesSet(); +// container.start(); +// for (int i = 0; i < 2; i++) { +// container.addMessageListener(new MessageListener() { +// @Override +// public void onMessage(Message message, byte[] pattern) { +// } +// }, PatternTopic.of("*")); +// } +// container.stop(); +// +// container= new RedisMessageListenerContainer(); +// container.setConnectionFactory(f); +// container.afterPropertiesSet(); +// container.start(); + for (int i = 0; i < 2; i++) { + container.addMessageListener(new MessageListener() { + @Override + public void onMessage(Message message, byte[] pattern) { + } + }, ChannelTopic.of("test"+i)); + } + container.stop(); + + container= new RedisMessageListenerContainer(); + container.setConnectionFactory(f); + container.afterPropertiesSet(); + container.start(); + for (int i = 0; i < 2; i++) { + container.addMessageListener(new MessageListener() { + @Override + public void onMessage(Message message, byte[] pattern) { + } + }, PatternTopic.of("*" + i)); + } + container.stop(); + } + + @Test + public void testCluster() throws IOException, InterruptedException { + RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave() .notifyKeyspaceEvents( + RedisRunner.KEYSPACE_EVENTS_OPTIONS.K, + RedisRunner.KEYSPACE_EVENTS_OPTIONS.g, + RedisRunner.KEYSPACE_EVENTS_OPTIONS.E, + RedisRunner.KEYSPACE_EVENTS_OPTIONS.x, + RedisRunner.KEYSPACE_EVENTS_OPTIONS.$) + ; + RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave() .notifyKeyspaceEvents( + RedisRunner.KEYSPACE_EVENTS_OPTIONS.K, + RedisRunner.KEYSPACE_EVENTS_OPTIONS.g, + RedisRunner.KEYSPACE_EVENTS_OPTIONS.E, + RedisRunner.KEYSPACE_EVENTS_OPTIONS.x, + RedisRunner.KEYSPACE_EVENTS_OPTIONS.$) + ; + RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave() .notifyKeyspaceEvents( + RedisRunner.KEYSPACE_EVENTS_OPTIONS.K, + RedisRunner.KEYSPACE_EVENTS_OPTIONS.g, + RedisRunner.KEYSPACE_EVENTS_OPTIONS.E, + RedisRunner.KEYSPACE_EVENTS_OPTIONS.x, + RedisRunner.KEYSPACE_EVENTS_OPTIONS.$) + ; + RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave() .notifyKeyspaceEvents( + RedisRunner.KEYSPACE_EVENTS_OPTIONS.K, + RedisRunner.KEYSPACE_EVENTS_OPTIONS.g, + RedisRunner.KEYSPACE_EVENTS_OPTIONS.E, + RedisRunner.KEYSPACE_EVENTS_OPTIONS.x, + RedisRunner.KEYSPACE_EVENTS_OPTIONS.$) + ; + RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave() .notifyKeyspaceEvents( + RedisRunner.KEYSPACE_EVENTS_OPTIONS.K, + RedisRunner.KEYSPACE_EVENTS_OPTIONS.g, + RedisRunner.KEYSPACE_EVENTS_OPTIONS.E, + RedisRunner.KEYSPACE_EVENTS_OPTIONS.x, + RedisRunner.KEYSPACE_EVENTS_OPTIONS.$) + ; + RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave() .notifyKeyspaceEvents( + RedisRunner.KEYSPACE_EVENTS_OPTIONS.K, + RedisRunner.KEYSPACE_EVENTS_OPTIONS.g, + RedisRunner.KEYSPACE_EVENTS_OPTIONS.E, + RedisRunner.KEYSPACE_EVENTS_OPTIONS.x, + RedisRunner.KEYSPACE_EVENTS_OPTIONS.$) + ; + + ClusterRunner clusterRunner = new ClusterRunner() + .addNode(master1, slave1) + .addNode(master2, slave2) + .addNode(master3, slave3); + ClusterRunner.ClusterProcesses process = clusterRunner.run(); + + Thread.sleep(5000); + + Config config = new Config(); + config.useClusterServers() + .setPingConnectionInterval(0) + .setLoadBalancer(new RandomLoadBalancer()) + .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); + RedissonClient redisson = Redisson.create(config); + + RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson); + + RedisMessageListenerContainer container = new RedisMessageListenerContainer(); + container.setConnectionFactory(factory); + Queue names = new ConcurrentLinkedQueue<>(); + container.addMessageListener(new MessageListener() { + @Override + public void onMessage(Message message, byte[] pattern) { + names.add(new String(message.getBody())); + } + }, new PatternTopic("__keyevent@0__:expired")); + container.afterPropertiesSet(); + container.start(); + + factory.getConnection().setEx("EG:test:key1".getBytes(), 3, "123".getBytes()); + factory.getConnection().setEx("test:key2".getBytes(), 3, "123".getBytes()); + factory.getConnection().setEx("test:key1".getBytes(), 3, "123".getBytes()); + + Awaitility.await().atMost(Durations.FIVE_SECONDS).untilAsserted(() -> { + assertThat(names).containsExactlyInAnyOrder("EG:test:key1", "test:key2", "test:key1"); + }); + + redisson.shutdown(); + process.shutdown(); + } + + @Test + public void testListenersDuplication() { + Queue msg = new ConcurrentLinkedQueue<>(); + MessageListener aListener = (message, pattern) -> { + msg.add(message.getBody()); + }; + + RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson); + RedisMessageListenerContainer container = new RedisMessageListenerContainer(); + container.setConnectionFactory(factory); + container.addMessageListener(aListener, + Arrays.asList(new ChannelTopic("a"), new ChannelTopic("b"))); + container.addMessageListener(aListener, + Arrays.asList(new PatternTopic("c*"))); + container.afterPropertiesSet(); + container.start(); + + RedisConnection c = factory.getConnection(); + c.publish("a".getBytes(), "msg".getBytes()); + + Awaitility.await().atMost(Durations.ONE_SECOND) + .untilAsserted(() -> { + assertThat(msg).containsExactly("msg".getBytes()); + }); + } + + @Test + public void testPatterTopic() throws IOException, InterruptedException { + RedisRunner.RedisProcess instance = new RedisRunner() + .nosave() + .randomPort() + .randomDir() + .notifyKeyspaceEvents( + RedisRunner.KEYSPACE_EVENTS_OPTIONS.K, + RedisRunner.KEYSPACE_EVENTS_OPTIONS.g, + RedisRunner.KEYSPACE_EVENTS_OPTIONS.E, + RedisRunner.KEYSPACE_EVENTS_OPTIONS.$) + .run(); + + Config config = new Config(); + config.useSingleServer().setAddress(instance.getRedisServerAddressAndPort()).setPingConnectionInterval(0); + RedissonClient redisson = Redisson.create(config); + + RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson); + + RedisMessageListenerContainer container = new RedisMessageListenerContainer(); + container.setConnectionFactory(factory); + AtomicInteger counterTest = new AtomicInteger(); + container.addMessageListener(new MessageListener() { + @Override + public void onMessage(Message message, byte[] pattern) { + counterTest.incrementAndGet(); + } + }, new PatternTopic("__keyspace@0__:mykey")); + container.addMessageListener(new MessageListener() { + @Override + public void onMessage(Message message, byte[] pattern) { + counterTest.incrementAndGet(); + } + }, new PatternTopic("__keyevent@0__:del")); + container.afterPropertiesSet(); + container.start(); + assertThat(container.isRunning()).isTrue(); + + RedisConnection c = factory.getConnection(); + c.set("mykey".getBytes(), "2".getBytes()); + c.del("mykey".getBytes()); + + Awaitility.await().atMost(Durations.FIVE_SECONDS).until(() -> { + return counterTest.get() == 3; + }); + + container.stop(); + redisson.shutdown(); + } + + @Test + public void testSubscribe() { + RedissonConnection connection = new RedissonConnection(redisson); + AtomicReference msg = new AtomicReference(); + connection.subscribe(new MessageListener() { + @Override + public void onMessage(Message message, byte[] pattern) { + msg.set(message.getBody()); + } + }, "test".getBytes()); + + connection.publish("test".getBytes(), "msg".getBytes()); + Awaitility.await().atMost(Durations.ONE_SECOND) + .until(() -> Arrays.equals("msg".getBytes(), msg.get())); + + connection.getSubscription().unsubscribe(); + + connection.publish("test".getBytes(), "msg".getBytes()); + } + + @Test + public void testUnSubscribe() { + RedissonConnection connection = new RedissonConnection(redisson); + AtomicReference msg = new AtomicReference(); + connection.subscribe(new MessageListener() { + @Override + public void onMessage(Message message, byte[] pattern) { + msg.set(message.getBody()); + } + }, "test".getBytes()); + + connection.publish("test".getBytes(), "msg".getBytes()); + Awaitility.await().atMost(Durations.ONE_SECOND) + .until(() -> Arrays.equals("msg".getBytes(), msg.get())); + + connection.getSubscription().unsubscribe(); + + + } + +} diff --git a/redisson-spring-data/redisson-spring-data-33/src/test/resources/logback.xml b/redisson-spring-data/redisson-spring-data-33/src/test/resources/logback.xml new file mode 100644 index 000000000..f5cd8c8fb --- /dev/null +++ b/redisson-spring-data/redisson-spring-data-33/src/test/resources/logback.xml @@ -0,0 +1,36 @@ + + + + + + + %d{yyyy.MM.dd HH:mm:ss.SSS} %-5level %c{0} : %msg%n + + + + + + + + + + + + +