From 6278653d05052eade4a4d19b5b82252693921a35 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Tue, 20 Feb 2024 11:42:10 +0300 Subject: [PATCH] Fixed - ReactiveKeyCommands.pExpire() throws an exception. #5640 --- redisson-spring-data/pom.xml | 6 + .../RedissonReactiveKeyCommands.java | 18 +- .../RedissonReactiveKeyCommands.java | 21 +- .../RedissonReactiveKeyCommands.java | 21 +- .../RedissonReactiveKeyCommands.java | 21 +- .../RedissonReactiveKeyCommands.java | 21 +- .../RedissonReactiveKeyCommands.java | 21 +- .../RedissonReactiveKeyCommands.java | 23 +- .../RedissonReactiveKeyCommands.java | 23 +- .../RedissonReactiveKeyCommands.java | 23 +- .../RedissonReactiveKeyCommands.java | 23 +- .../connection/RedisClusterNodeDecoder.java | 27 +- .../connection/RedissonClusterConnection.java | 11 +- .../RedissonReactiveKeyCommands.java | 24 +- ...edissonReactiveRedisClusterConnection.java | 9 +- .../src/test/java/org/redisson/BaseTest.java | 172 +++++--- .../RedissonClusterConnectionRenameTest.java | 130 ++---- .../RedissonClusterConnectionTest.java | 378 +++++++++--------- ...edissonReactiveClusterKeyCommandsTest.java | 155 +++---- .../RedissonReactiveKeyCommandsTest.java | 21 + 20 files changed, 553 insertions(+), 595 deletions(-) create mode 100644 redisson-spring-data/redisson-spring-data-32/src/test/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommandsTest.java diff --git a/redisson-spring-data/pom.xml b/redisson-spring-data/pom.xml index 0c8477fc1..ee5f7dcd1 100644 --- a/redisson-spring-data/pom.xml +++ b/redisson-spring-data/pom.xml @@ -66,6 +66,12 @@ test + + org.testcontainers + junit-jupiter + test + + ch.qos.logback logback-classic diff --git a/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java b/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java index 13d95c3cb..69703246b 100644 --- a/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java +++ b/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java @@ -15,11 +15,6 @@ */ package org.redisson.spring.data.connection; -import java.nio.ByteBuffer; -import java.time.Duration; -import java.util.List; -import java.util.stream.Collectors; - import org.reactivestreams.Publisher; import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.codec.StringCodec; @@ -30,16 +25,15 @@ import org.redisson.client.protocol.convertor.Convertor; import org.redisson.reactive.CommandReactiveExecutor; import org.springframework.data.redis.connection.DataType; import org.springframework.data.redis.connection.ReactiveKeyCommands; -import org.springframework.data.redis.connection.ReactiveRedisConnection.BooleanResponse; -import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse; -import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand; -import org.springframework.data.redis.connection.ReactiveRedisConnection.MultiValueResponse; -import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse; +import org.springframework.data.redis.connection.ReactiveRedisConnection.*; import org.springframework.util.Assert; - import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.stream.Collectors; + /** * * @author Nikita Koksharov @@ -180,7 +174,7 @@ public class RedissonReactiveKeyCommands extends RedissonBaseReactive implements Assert.notNull(command.getKey(), "Key must not be null!"); byte[] keyBuf = toByteArray(command.getKey()); - Mono m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf); + Mono m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf, command.getTimeout().toMillis()); return m.map(v -> new BooleanResponse<>(command, v)); }); } diff --git a/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java b/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java index 7be717953..9cbea84d2 100644 --- a/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java +++ b/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java @@ -15,12 +15,6 @@ */ package org.redisson.spring.data.connection; -import java.nio.ByteBuffer; -import java.time.Duration; -import java.util.Collection; -import java.util.List; -import java.util.stream.Collectors; - import org.reactivestreams.Publisher; import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.codec.StringCodec; @@ -32,18 +26,19 @@ import org.redisson.reactive.CommandReactiveExecutor; import org.redisson.reactive.RedissonKeysReactive; import org.springframework.data.redis.connection.DataType; import org.springframework.data.redis.connection.ReactiveKeyCommands; -import org.springframework.data.redis.connection.ReactiveRedisConnection.BooleanResponse; -import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse; -import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand; -import org.springframework.data.redis.connection.ReactiveRedisConnection.MultiValueResponse; -import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse; +import org.springframework.data.redis.connection.ReactiveRedisConnection.*; import org.springframework.data.redis.connection.ValueEncoding; import org.springframework.data.redis.core.ScanOptions; import org.springframework.util.Assert; - import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + /** * * @author Nikita Koksharov @@ -228,7 +223,7 @@ public class RedissonReactiveKeyCommands extends RedissonBaseReactive implements Assert.notNull(command.getKey(), "Key must not be null!"); byte[] keyBuf = toByteArray(command.getKey()); - Mono m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf); + Mono m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf, command.getTimeout().toMillis()); return m.map(v -> new BooleanResponse<>(command, v)); }); } diff --git a/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java b/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java index 7be717953..9cbea84d2 100644 --- a/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java +++ b/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java @@ -15,12 +15,6 @@ */ package org.redisson.spring.data.connection; -import java.nio.ByteBuffer; -import java.time.Duration; -import java.util.Collection; -import java.util.List; -import java.util.stream.Collectors; - import org.reactivestreams.Publisher; import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.codec.StringCodec; @@ -32,18 +26,19 @@ import org.redisson.reactive.CommandReactiveExecutor; import org.redisson.reactive.RedissonKeysReactive; import org.springframework.data.redis.connection.DataType; import org.springframework.data.redis.connection.ReactiveKeyCommands; -import org.springframework.data.redis.connection.ReactiveRedisConnection.BooleanResponse; -import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse; -import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand; -import org.springframework.data.redis.connection.ReactiveRedisConnection.MultiValueResponse; -import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse; +import org.springframework.data.redis.connection.ReactiveRedisConnection.*; import org.springframework.data.redis.connection.ValueEncoding; import org.springframework.data.redis.core.ScanOptions; import org.springframework.util.Assert; - import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + /** * * @author Nikita Koksharov @@ -228,7 +223,7 @@ public class RedissonReactiveKeyCommands extends RedissonBaseReactive implements Assert.notNull(command.getKey(), "Key must not be null!"); byte[] keyBuf = toByteArray(command.getKey()); - Mono m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf); + Mono m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf, command.getTimeout().toMillis()); return m.map(v -> new BooleanResponse<>(command, v)); }); } diff --git a/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java b/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java index 7be717953..9cbea84d2 100644 --- a/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java +++ b/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java @@ -15,12 +15,6 @@ */ package org.redisson.spring.data.connection; -import java.nio.ByteBuffer; -import java.time.Duration; -import java.util.Collection; -import java.util.List; -import java.util.stream.Collectors; - import org.reactivestreams.Publisher; import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.codec.StringCodec; @@ -32,18 +26,19 @@ import org.redisson.reactive.CommandReactiveExecutor; import org.redisson.reactive.RedissonKeysReactive; import org.springframework.data.redis.connection.DataType; import org.springframework.data.redis.connection.ReactiveKeyCommands; -import org.springframework.data.redis.connection.ReactiveRedisConnection.BooleanResponse; -import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse; -import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand; -import org.springframework.data.redis.connection.ReactiveRedisConnection.MultiValueResponse; -import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse; +import org.springframework.data.redis.connection.ReactiveRedisConnection.*; import org.springframework.data.redis.connection.ValueEncoding; import org.springframework.data.redis.core.ScanOptions; import org.springframework.util.Assert; - import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + /** * * @author Nikita Koksharov @@ -228,7 +223,7 @@ public class RedissonReactiveKeyCommands extends RedissonBaseReactive implements Assert.notNull(command.getKey(), "Key must not be null!"); byte[] keyBuf = toByteArray(command.getKey()); - Mono m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf); + Mono m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf, command.getTimeout().toMillis()); return m.map(v -> new BooleanResponse<>(command, v)); }); } diff --git a/redisson-spring-data/redisson-spring-data-24/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java b/redisson-spring-data/redisson-spring-data-24/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java index 7be717953..9cbea84d2 100644 --- a/redisson-spring-data/redisson-spring-data-24/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java +++ b/redisson-spring-data/redisson-spring-data-24/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java @@ -15,12 +15,6 @@ */ package org.redisson.spring.data.connection; -import java.nio.ByteBuffer; -import java.time.Duration; -import java.util.Collection; -import java.util.List; -import java.util.stream.Collectors; - import org.reactivestreams.Publisher; import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.codec.StringCodec; @@ -32,18 +26,19 @@ import org.redisson.reactive.CommandReactiveExecutor; import org.redisson.reactive.RedissonKeysReactive; import org.springframework.data.redis.connection.DataType; import org.springframework.data.redis.connection.ReactiveKeyCommands; -import org.springframework.data.redis.connection.ReactiveRedisConnection.BooleanResponse; -import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse; -import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand; -import org.springframework.data.redis.connection.ReactiveRedisConnection.MultiValueResponse; -import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse; +import org.springframework.data.redis.connection.ReactiveRedisConnection.*; import org.springframework.data.redis.connection.ValueEncoding; import org.springframework.data.redis.core.ScanOptions; import org.springframework.util.Assert; - import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + /** * * @author Nikita Koksharov @@ -228,7 +223,7 @@ public class RedissonReactiveKeyCommands extends RedissonBaseReactive implements Assert.notNull(command.getKey(), "Key must not be null!"); byte[] keyBuf = toByteArray(command.getKey()); - Mono m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf); + Mono m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf, command.getTimeout().toMillis()); return m.map(v -> new BooleanResponse<>(command, v)); }); } diff --git a/redisson-spring-data/redisson-spring-data-25/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java b/redisson-spring-data/redisson-spring-data-25/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java index 7be717953..9cbea84d2 100644 --- a/redisson-spring-data/redisson-spring-data-25/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java +++ b/redisson-spring-data/redisson-spring-data-25/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java @@ -15,12 +15,6 @@ */ package org.redisson.spring.data.connection; -import java.nio.ByteBuffer; -import java.time.Duration; -import java.util.Collection; -import java.util.List; -import java.util.stream.Collectors; - import org.reactivestreams.Publisher; import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.codec.StringCodec; @@ -32,18 +26,19 @@ import org.redisson.reactive.CommandReactiveExecutor; import org.redisson.reactive.RedissonKeysReactive; import org.springframework.data.redis.connection.DataType; import org.springframework.data.redis.connection.ReactiveKeyCommands; -import org.springframework.data.redis.connection.ReactiveRedisConnection.BooleanResponse; -import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse; -import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand; -import org.springframework.data.redis.connection.ReactiveRedisConnection.MultiValueResponse; -import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse; +import org.springframework.data.redis.connection.ReactiveRedisConnection.*; import org.springframework.data.redis.connection.ValueEncoding; import org.springframework.data.redis.core.ScanOptions; import org.springframework.util.Assert; - import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + /** * * @author Nikita Koksharov @@ -228,7 +223,7 @@ public class RedissonReactiveKeyCommands extends RedissonBaseReactive implements Assert.notNull(command.getKey(), "Key must not be null!"); byte[] keyBuf = toByteArray(command.getKey()); - Mono m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf); + Mono m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf, command.getTimeout().toMillis()); return m.map(v -> new BooleanResponse<>(command, v)); }); } diff --git a/redisson-spring-data/redisson-spring-data-26/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java b/redisson-spring-data/redisson-spring-data-26/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java index d752c36a9..956010480 100644 --- a/redisson-spring-data/redisson-spring-data-26/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java +++ b/redisson-spring-data/redisson-spring-data-26/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java @@ -15,13 +15,6 @@ */ package org.redisson.spring.data.connection; -import java.nio.ByteBuffer; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.stream.Collectors; - import org.reactivestreams.Publisher; import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.codec.StringCodec; @@ -33,18 +26,20 @@ import org.redisson.reactive.CommandReactiveExecutor; import org.redisson.reactive.RedissonKeysReactive; import org.springframework.data.redis.connection.DataType; import org.springframework.data.redis.connection.ReactiveKeyCommands; -import org.springframework.data.redis.connection.ReactiveRedisConnection.BooleanResponse; -import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse; -import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand; -import org.springframework.data.redis.connection.ReactiveRedisConnection.MultiValueResponse; -import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse; +import org.springframework.data.redis.connection.ReactiveRedisConnection.*; import org.springframework.data.redis.connection.ValueEncoding; import org.springframework.data.redis.core.ScanOptions; import org.springframework.util.Assert; - import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + /** * * @author Nikita Koksharov @@ -229,7 +224,7 @@ public class RedissonReactiveKeyCommands extends RedissonBaseReactive implements Assert.notNull(command.getKey(), "Key must not be null!"); byte[] keyBuf = toByteArray(command.getKey()); - Mono m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf); + Mono m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf, command.getTimeout().toMillis()); return m.map(v -> new BooleanResponse<>(command, v)); }); } diff --git a/redisson-spring-data/redisson-spring-data-27/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java b/redisson-spring-data/redisson-spring-data-27/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java index d752c36a9..956010480 100644 --- a/redisson-spring-data/redisson-spring-data-27/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java +++ b/redisson-spring-data/redisson-spring-data-27/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java @@ -15,13 +15,6 @@ */ package org.redisson.spring.data.connection; -import java.nio.ByteBuffer; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.stream.Collectors; - import org.reactivestreams.Publisher; import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.codec.StringCodec; @@ -33,18 +26,20 @@ import org.redisson.reactive.CommandReactiveExecutor; import org.redisson.reactive.RedissonKeysReactive; import org.springframework.data.redis.connection.DataType; import org.springframework.data.redis.connection.ReactiveKeyCommands; -import org.springframework.data.redis.connection.ReactiveRedisConnection.BooleanResponse; -import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse; -import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand; -import org.springframework.data.redis.connection.ReactiveRedisConnection.MultiValueResponse; -import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse; +import org.springframework.data.redis.connection.ReactiveRedisConnection.*; import org.springframework.data.redis.connection.ValueEncoding; import org.springframework.data.redis.core.ScanOptions; import org.springframework.util.Assert; - import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + /** * * @author Nikita Koksharov @@ -229,7 +224,7 @@ public class RedissonReactiveKeyCommands extends RedissonBaseReactive implements Assert.notNull(command.getKey(), "Key must not be null!"); byte[] keyBuf = toByteArray(command.getKey()); - Mono m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf); + Mono m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf, command.getTimeout().toMillis()); return m.map(v -> new BooleanResponse<>(command, v)); }); } diff --git a/redisson-spring-data/redisson-spring-data-30/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java b/redisson-spring-data/redisson-spring-data-30/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java index 26ff6aa16..1f1d3140a 100644 --- a/redisson-spring-data/redisson-spring-data-30/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java +++ b/redisson-spring-data/redisson-spring-data-30/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java @@ -15,13 +15,6 @@ */ package org.redisson.spring.data.connection; -import java.nio.ByteBuffer; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.stream.Collectors; - import org.reactivestreams.Publisher; import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.codec.StringCodec; @@ -33,18 +26,20 @@ import org.redisson.reactive.CommandReactiveExecutor; import org.redisson.reactive.RedissonKeysReactive; import org.springframework.data.redis.connection.DataType; import org.springframework.data.redis.connection.ReactiveKeyCommands; -import org.springframework.data.redis.connection.ReactiveRedisConnection.BooleanResponse; -import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse; -import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand; -import org.springframework.data.redis.connection.ReactiveRedisConnection.MultiValueResponse; -import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse; +import org.springframework.data.redis.connection.ReactiveRedisConnection.*; import org.springframework.data.redis.connection.ValueEncoding; import org.springframework.data.redis.core.ScanOptions; import org.springframework.util.Assert; - import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + /** * * @author Nikita Koksharov @@ -229,7 +224,7 @@ public class RedissonReactiveKeyCommands extends RedissonBaseReactive implements Assert.notNull(command.getKey(), "Key must not be null!"); byte[] keyBuf = toByteArray(command.getKey()); - Mono m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf); + Mono m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf, command.getTimeout().toMillis()); return m.map(v -> new BooleanResponse<>(command, v)); }); } diff --git a/redisson-spring-data/redisson-spring-data-31/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java b/redisson-spring-data/redisson-spring-data-31/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java index 26ff6aa16..1f1d3140a 100644 --- a/redisson-spring-data/redisson-spring-data-31/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java +++ b/redisson-spring-data/redisson-spring-data-31/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java @@ -15,13 +15,6 @@ */ package org.redisson.spring.data.connection; -import java.nio.ByteBuffer; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.stream.Collectors; - import org.reactivestreams.Publisher; import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.codec.StringCodec; @@ -33,18 +26,20 @@ import org.redisson.reactive.CommandReactiveExecutor; import org.redisson.reactive.RedissonKeysReactive; import org.springframework.data.redis.connection.DataType; import org.springframework.data.redis.connection.ReactiveKeyCommands; -import org.springframework.data.redis.connection.ReactiveRedisConnection.BooleanResponse; -import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse; -import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand; -import org.springframework.data.redis.connection.ReactiveRedisConnection.MultiValueResponse; -import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse; +import org.springframework.data.redis.connection.ReactiveRedisConnection.*; import org.springframework.data.redis.connection.ValueEncoding; import org.springframework.data.redis.core.ScanOptions; import org.springframework.util.Assert; - import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + /** * * @author Nikita Koksharov @@ -229,7 +224,7 @@ public class RedissonReactiveKeyCommands extends RedissonBaseReactive implements Assert.notNull(command.getKey(), "Key must not be null!"); byte[] keyBuf = toByteArray(command.getKey()); - Mono m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf); + Mono m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf, command.getTimeout().toMillis()); return m.map(v -> new BooleanResponse<>(command, v)); }); } diff --git a/redisson-spring-data/redisson-spring-data-32/src/main/java/org/redisson/spring/data/connection/RedisClusterNodeDecoder.java b/redisson-spring-data/redisson-spring-data-32/src/main/java/org/redisson/spring/data/connection/RedisClusterNodeDecoder.java index 59cd5798f..8c3304e06 100644 --- a/redisson-spring-data/redisson-spring-data-32/src/main/java/org/redisson/spring/data/connection/RedisClusterNodeDecoder.java +++ b/redisson-spring-data/redisson-spring-data-32/src/main/java/org/redisson/spring/data/connection/RedisClusterNodeDecoder.java @@ -15,15 +15,11 @@ */ package org.redisson.spring.data.connection; -import java.io.IOException; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - +import io.netty.buffer.ByteBuf; +import io.netty.util.CharsetUtil; import org.redisson.client.handler.State; import org.redisson.client.protocol.Decoder; +import org.redisson.connection.ServiceManager; import org.redisson.misc.RedisURI; import org.springframework.data.redis.connection.RedisClusterNode; import org.springframework.data.redis.connection.RedisClusterNode.Flag; @@ -32,8 +28,8 @@ import org.springframework.data.redis.connection.RedisClusterNode.RedisClusterNo import org.springframework.data.redis.connection.RedisClusterNode.SlotRange; import org.springframework.data.redis.connection.RedisNode.NodeType; -import io.netty.buffer.ByteBuf; -import io.netty.util.CharsetUtil; +import java.io.IOException; +import java.util.*; /** * @@ -42,6 +38,12 @@ import io.netty.util.CharsetUtil; */ public class RedisClusterNodeDecoder implements Decoder> { + private final ServiceManager serviceManager; + + public RedisClusterNodeDecoder(ServiceManager serviceManager) { + this.serviceManager = serviceManager; + } + @Override public List decode(ByteBuf buf, State state) throws IOException { String response = buf.toString(CharsetUtil.UTF_8); @@ -63,7 +65,14 @@ public class RedisClusterNodeDecoder implements Decoder> RedisURI address = null; if (!flags.contains(Flag.NOADDR)) { String addr = params[1].split("@")[0]; + String name = addr.substring(0, addr.lastIndexOf(":")); + if (name.isEmpty()) { + // skip nodes with empty address + continue; + } + address = new RedisURI("redis://" + addr); + address = serviceManager.toURI("redis", address.getHost(), String.valueOf(address.getPort())); } String masterId = params[3]; diff --git a/redisson-spring-data/redisson-spring-data-32/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java b/redisson-spring-data/redisson-spring-data-32/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java index 4138f75b1..32e02fc26 100644 --- a/redisson-spring-data/redisson-spring-data-32/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java +++ b/redisson-spring-data/redisson-spring-data-32/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java @@ -15,7 +15,6 @@ */ package org.redisson.spring.data.connection; -import io.netty.util.CharsetUtil; import org.redisson.api.BatchResult; import org.redisson.api.RFuture; import org.redisson.api.RedissonClient; @@ -57,16 +56,16 @@ import java.util.stream.Collectors; */ public class RedissonClusterConnection extends RedissonConnection implements RedisClusterConnection, DefaultedRedisClusterConnection { - private static final RedisStrictCommand> CLUSTER_NODES = - new RedisStrictCommand>("CLUSTER", "NODES", new ObjectDecoder(new RedisClusterNodeDecoder())); - public RedissonClusterConnection(RedissonClient redisson) { super(redisson); } @Override public Iterable clusterGetNodes() { - return read(null, StringCodec.INSTANCE, CLUSTER_NODES); + RedisStrictCommand> cluster + = new RedisStrictCommand>("CLUSTER", "NODES", + new ObjectDecoder(new RedisClusterNodeDecoder(executorService.getServiceManager()))); + return read(null, StringCodec.INSTANCE, cluster); } @Override @@ -100,7 +99,7 @@ public class RedissonClusterConnection extends RedissonConnection implements Red public Map> clusterGetMasterReplicaMap() { Iterable res = clusterGetNodes(); - Set masters = new HashSet(); + Set masters = new HashSet<>(); for (Iterator iterator = res.iterator(); iterator.hasNext();) { RedisClusterNode redisClusterNode = iterator.next(); if (redisClusterNode.isMaster()) { diff --git a/redisson-spring-data/redisson-spring-data-32/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java b/redisson-spring-data/redisson-spring-data-32/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java index 26ff6aa16..9fda6383d 100644 --- a/redisson-spring-data/redisson-spring-data-32/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java +++ b/redisson-spring-data/redisson-spring-data-32/src/main/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommands.java @@ -15,13 +15,6 @@ */ package org.redisson.spring.data.connection; -import java.nio.ByteBuffer; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.stream.Collectors; - import org.reactivestreams.Publisher; import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.codec.StringCodec; @@ -33,18 +26,20 @@ import org.redisson.reactive.CommandReactiveExecutor; import org.redisson.reactive.RedissonKeysReactive; import org.springframework.data.redis.connection.DataType; import org.springframework.data.redis.connection.ReactiveKeyCommands; -import org.springframework.data.redis.connection.ReactiveRedisConnection.BooleanResponse; -import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse; -import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand; -import org.springframework.data.redis.connection.ReactiveRedisConnection.MultiValueResponse; -import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse; +import org.springframework.data.redis.connection.ReactiveRedisConnection.*; import org.springframework.data.redis.connection.ValueEncoding; import org.springframework.data.redis.core.ScanOptions; import org.springframework.util.Assert; - import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + /** * * @author Nikita Koksharov @@ -227,9 +222,10 @@ public class RedissonReactiveKeyCommands extends RedissonBaseReactive implements return execute(commands, command -> { Assert.notNull(command.getKey(), "Key must not be null!"); + Assert.notNull(command.getTimeout(), "Timeout must not be null!"); byte[] keyBuf = toByteArray(command.getKey()); - Mono m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf); + Mono m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf, command.getTimeout().toMillis()); return m.map(v -> new BooleanResponse<>(command, v)); }); } diff --git a/redisson-spring-data/redisson-spring-data-32/src/main/java/org/redisson/spring/data/connection/RedissonReactiveRedisClusterConnection.java b/redisson-spring-data/redisson-spring-data-32/src/main/java/org/redisson/spring/data/connection/RedissonReactiveRedisClusterConnection.java index 5c40417a0..fb3ea7464 100644 --- a/redisson-spring-data/redisson-spring-data-32/src/main/java/org/redisson/spring/data/connection/RedissonReactiveRedisClusterConnection.java +++ b/redisson-spring-data/redisson-spring-data-32/src/main/java/org/redisson/spring/data/connection/RedissonReactiveRedisClusterConnection.java @@ -102,12 +102,13 @@ public class RedissonReactiveRedisClusterConnection extends RedissonReactiveRedi return execute(node, RedisCommands.PING); } - private static final RedisStrictCommand> CLUSTER_NODES = - new RedisStrictCommand<>("CLUSTER", "NODES", new ObjectDecoder(new RedisClusterNodeDecoder())); - @Override public Flux clusterGetNodes() { - Mono> result = read(null, StringCodec.INSTANCE, CLUSTER_NODES); + RedisStrictCommand> cluster + = new RedisStrictCommand>("CLUSTER", "NODES", + new ObjectDecoder(new RedisClusterNodeDecoder(executorService.getServiceManager()))); + + Mono> result = read(null, StringCodec.INSTANCE, cluster); return result.flatMapMany(e -> Flux.fromIterable(e)); } diff --git a/redisson-spring-data/redisson-spring-data-32/src/test/java/org/redisson/BaseTest.java b/redisson-spring-data/redisson-spring-data-32/src/test/java/org/redisson/BaseTest.java index 4e73334f1..0f00625d8 100644 --- a/redisson-spring-data/redisson-spring-data-32/src/test/java/org/redisson/BaseTest.java +++ b/redisson-spring-data/redisson-spring-data-32/src/test/java/org/redisson/BaseTest.java @@ -1,86 +1,140 @@ package org.redisson; -import java.io.IOException; - -import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; +import org.redisson.api.NatMapper; import org.redisson.api.RedissonClient; import org.redisson.config.Config; +import org.redisson.config.Protocol; +import org.redisson.misc.RedisURI; +import org.redisson.spring.data.connection.RedissonClusterConnection; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.function.Consumer; public abstract class BaseTest { - - protected RedissonClient redisson; - protected static RedissonClient defaultRedisson; - @BeforeClass - public static void beforeClass() throws IOException, InterruptedException { - if (!RedissonRuntimeEnvironment.isTravis) { - RedisRunner.startDefaultRedisServerInstance(); - defaultRedisson = createInstance(); - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - defaultRedisson.shutdown(); - try { - RedisRunner.shutDownDefaultRedisServerInstance(); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - }); - } + protected static final String NOTIFY_KEYSPACE_EVENTS = "--notify-keyspace-events"; + + private static final GenericContainer REDIS = createRedis(); + + protected static final Protocol protocol = Protocol.RESP2; + + protected static RedissonClient redisson; + + protected static RedissonClient redissonCluster; + + private static GenericContainer REDIS_CLUSTER; + + protected static GenericContainer createRedis() { + return createRedis("7.2"); } - @Before - public void before() throws IOException, InterruptedException { - if (RedissonRuntimeEnvironment.isTravis) { - RedisRunner.startDefaultRedisServerInstance(); - redisson = createInstance(); - } else { - if (redisson == null) { - redisson = defaultRedisson; - } - if (flushBetweenTests()) { - redisson.getKeys().flushall(); - } - } + protected static GenericContainer createRedis(String version) { + return new GenericContainer<>("redis:" + version) + .withCreateContainerCmdModifier(cmd -> { + cmd.withCmd("redis-server", "--save", "''"); + }) + .withExposedPorts(6379); } - @After - public void after() throws InterruptedException { - if (RedissonRuntimeEnvironment.isTravis) { - redisson.shutdown(); - RedisRunner.shutDownDefaultRedisServerInstance(); + @BeforeClass + public static void beforeAll() { + if (redisson == null) { + REDIS.start(); + Config config = createConfig(); + redisson = Redisson.create(config); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + redisson.shutdown(); + REDIS.stop(); + if (redissonCluster != null) { + redissonCluster.shutdown(); + redissonCluster = null; + } + if (REDIS_CLUSTER != null) { + REDIS_CLUSTER.stop(); + REDIS_CLUSTER = null; + } + })); } } - public static Config createConfig() { -// String redisAddress = System.getProperty("redisAddress"); -// if (redisAddress == null) { -// redisAddress = "127.0.0.1:6379"; -// } + protected static Config createConfig() { Config config = new Config(); -// config.setCodec(new MsgPackJacksonCodec()); -// config.useSentinelServers().setMasterName("mymaster").addSentinelAddress("127.0.0.1:26379", "127.0.0.1:26389"); -// config.useClusterServers().addNodeAddress("127.0.0.1:7004", "127.0.0.1:7001", "127.0.0.1:7000"); + config.setProtocol(protocol); config.useSingleServer() - .setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort()); -// .setPassword("mypass1"); -// config.useMasterSlaveConnection() -// .setMasterAddress("127.0.0.1:6379") -// .addSlaveAddress("127.0.0.1:6399") -// .addSlaveAddress("127.0.0.1:6389"); + .setAddress("redis://127.0.0.1:" + REDIS.getFirstMappedPort()); return config; } - public static RedissonClient createInstance() { + protected static RedissonClient createInstance() { Config config = createConfig(); return Redisson.create(config); } - protected boolean flushBetweenTests() { - return true; + protected void testWithParams(Consumer redissonCallback, String... params) { + GenericContainer redis = + new GenericContainer<>("redis:7.2") + .withCreateContainerCmdModifier(cmd -> { + List args = new ArrayList<>(); + args.add("redis-server"); + args.addAll(Arrays.asList(params)); + cmd.withCmd(args); + }) + .withExposedPorts(6379); + redis.start(); + + Config config = new Config(); + config.setProtocol(protocol); + config.useSingleServer().setAddress("redis://127.0.0.1:" + redis.getFirstMappedPort()); + RedissonClient redisson = Redisson.create(config); + + try { + redissonCallback.accept(redisson); + } finally { + redisson.shutdown(); + redis.stop(); + } + + } + + protected void testInCluster(Consumer redissonCallback) { + if (redissonCluster == null) { + REDIS_CLUSTER = new GenericContainer<>("vishnunair/docker-redis-cluster") + .withExposedPorts(6379, 6380, 6381, 6382, 6383, 6384) + .withStartupCheckStrategy(new MinimumDurationRunningStartupCheckStrategy(Duration.ofSeconds(15))); + REDIS_CLUSTER.start(); + + Config config = new Config(); + config.setProtocol(protocol); + config.useClusterServers() + .setNatMapper(new NatMapper() { + @Override + public RedisURI map(RedisURI uri) { + if (REDIS_CLUSTER.getMappedPort(uri.getPort()) == null) { + return uri; + } + return new RedisURI(uri.getScheme(), REDIS_CLUSTER.getHost(), REDIS_CLUSTER.getMappedPort(uri.getPort())); + } + }) + .addNodeAddress("redis://127.0.0.1:" + REDIS_CLUSTER.getFirstMappedPort()); + redissonCluster = Redisson.create(config); + } + + redissonCallback.accept(new RedissonClusterConnection(redissonCluster)); + } + + @Before + public void beforeEach() { + redisson.getKeys().flushall(); + if (redissonCluster != null) { + redissonCluster.getKeys().flushall(); + } } } diff --git a/redisson-spring-data/redisson-spring-data-32/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionRenameTest.java b/redisson-spring-data/redisson-spring-data-32/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionRenameTest.java index ed8a07bb7..c3233eb53 100644 --- a/redisson-spring-data/redisson-spring-data-32/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionRenameTest.java +++ b/redisson-spring-data/redisson-spring-data-32/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionRenameTest.java @@ -1,31 +1,19 @@ package org.redisson.spring.data.connection; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.redisson.ClusterRunner; -import org.redisson.ClusterRunner.ClusterProcesses; -import org.redisson.RedisRunner; -import org.redisson.RedisRunner.FailedToStartRedisException; -import org.redisson.Redisson; -import org.redisson.api.RedissonClient; -import org.redisson.config.Config; -import org.redisson.config.SubscriptionMode; -import org.redisson.connection.balancer.RandomLoadBalancer; +import org.redisson.BaseTest; import org.springframework.dao.InvalidDataAccessResourceUsageException; -import java.io.IOException; import java.util.Arrays; -import static org.assertj.core.api.Assertions.*; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT; @RunWith(Parameterized.class) -public class RedissonClusterConnectionRenameTest { +public class RedissonClusterConnectionRenameTest extends BaseTest { @Parameterized.Parameters(name= "{index} - same slot = {0}") public static Iterable data() { @@ -38,79 +26,41 @@ public class RedissonClusterConnectionRenameTest { @Parameterized.Parameter(0) public boolean sameSlot; - static RedissonClient redisson; - static RedissonClusterConnection connection; - static ClusterProcesses process; - byte[] originalKey = "key".getBytes(); byte[] newKey = "unset".getBytes(); byte[] value = "value".getBytes(); - @BeforeClass - public static void before() throws FailedToStartRedisException, IOException, InterruptedException { - RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave(); - - - ClusterRunner clusterRunner = new ClusterRunner() - .addNode(master1, slave1) - .addNode(master2, slave2) - .addNode(master3, slave3); - process = clusterRunner.run(); - - Config config = new Config(); - config.useClusterServers() - .setSubscriptionMode(SubscriptionMode.SLAVE) - .setLoadBalancer(new RandomLoadBalancer()) - .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); - - redisson = Redisson.create(config); - connection = new RedissonClusterConnection(redisson); - } - - @AfterClass - public static void after() { - process.shutdown(); - redisson.shutdown(); - } - - @After - public void cleanup() { - connection.del(originalKey); - connection.del(newKey); - } - @Test public void testRename() { - connection.set(originalKey, value); - connection.expire(originalKey, 1000); + testInCluster(connection -> { + connection.set(originalKey, value); + connection.expire(originalKey, 1000); - Integer originalSlot = connection.clusterGetSlotForKey(originalKey); - newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot)); + Integer originalSlot = connection.clusterGetSlotForKey(originalKey); + newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot), connection); - connection.rename(originalKey, newKey); + connection.rename(originalKey, newKey); - assertThat(connection.get(newKey)).isEqualTo(value); - assertThat(connection.ttl(newKey)).isGreaterThan(0); + assertThat(connection.get(newKey)).isEqualTo(value); + assertThat(connection.ttl(newKey)).isGreaterThan(0); + }); } @Test public void testRename_pipeline() { - connection.set(originalKey, value); + testInCluster(connection -> { + connection.set(originalKey, value); - Integer originalSlot = connection.clusterGetSlotForKey(originalKey); - newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot)); + Integer originalSlot = connection.clusterGetSlotForKey(originalKey); + newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot), connection); - connection.openPipeline(); - assertThatThrownBy(() -> connection.rename(originalKey, newKey)).isInstanceOf(InvalidDataAccessResourceUsageException.class); - connection.closePipeline(); + connection.openPipeline(); + assertThatThrownBy(() -> connection.rename(originalKey, newKey)).isInstanceOf(InvalidDataAccessResourceUsageException.class); + connection.closePipeline(); + }); } - protected byte[] getNewKeyForSlot(byte[] originalKey, Integer targetSlot) { + protected byte[] getNewKeyForSlot(byte[] originalKey, Integer targetSlot, RedissonClusterConnection connection) { int counter = 0; byte[] newKey = (new String(originalKey) + counter).getBytes(); @@ -128,35 +78,39 @@ public class RedissonClusterConnectionRenameTest { @Test public void testRenameNX() { - connection.set(originalKey, value); - connection.expire(originalKey, 1000); + testInCluster(connection -> { + connection.set(originalKey, value); + connection.expire(originalKey, 1000); - Integer originalSlot = connection.clusterGetSlotForKey(originalKey); - newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot)); + Integer originalSlot = connection.clusterGetSlotForKey(originalKey); + newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot), connection); - Boolean result = connection.renameNX(originalKey, newKey); + Boolean result = connection.renameNX(originalKey, newKey); - assertThat(connection.get(newKey)).isEqualTo(value); - assertThat(connection.ttl(newKey)).isGreaterThan(0); - assertThat(result).isTrue(); + assertThat(connection.get(newKey)).isEqualTo(value); + assertThat(connection.ttl(newKey)).isGreaterThan(0); + assertThat(result).isTrue(); - connection.set(originalKey, value); + connection.set(originalKey, value); - result = connection.renameNX(originalKey, newKey); + result = connection.renameNX(originalKey, newKey); - assertThat(result).isFalse(); + assertThat(result).isFalse(); + }); } @Test public void testRenameNX_pipeline() { - connection.set(originalKey, value); + testInCluster(connection -> { + connection.set(originalKey, value); - Integer originalSlot = connection.clusterGetSlotForKey(originalKey); - newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot)); + Integer originalSlot = connection.clusterGetSlotForKey(originalKey); + newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot), connection); - connection.openPipeline(); - assertThatThrownBy(() -> connection.renameNX(originalKey, newKey)).isInstanceOf(InvalidDataAccessResourceUsageException.class); - connection.closePipeline(); + connection.openPipeline(); + assertThatThrownBy(() -> connection.renameNX(originalKey, newKey)).isInstanceOf(InvalidDataAccessResourceUsageException.class); + connection.closePipeline(); + }); } private Integer getTargetSlot(Integer originalSlot) { diff --git a/redisson-spring-data/redisson-spring-data-32/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java b/redisson-spring-data/redisson-spring-data-32/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java index e376ac34f..1777b8f5f 100644 --- a/redisson-spring-data/redisson-spring-data-32/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java +++ b/redisson-spring-data/redisson-spring-data-32/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java @@ -1,20 +1,10 @@ package org.redisson.spring.data.connection; import net.bytebuddy.utility.RandomString; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; -import org.redisson.ClusterRunner; -import org.redisson.ClusterRunner.ClusterProcesses; -import org.redisson.RedisRunner; -import org.redisson.RedisRunner.FailedToStartRedisException; -import org.redisson.Redisson; +import org.redisson.BaseTest; import org.redisson.api.RedissonClient; -import org.redisson.config.Config; -import org.redisson.config.ReadMode; -import org.redisson.config.SubscriptionMode; import org.redisson.connection.MasterSlaveConnectionManager; -import org.redisson.connection.balancer.RandomLoadBalancer; import org.springframework.data.redis.connection.ClusterInfo; import org.springframework.data.redis.connection.RedisClusterNode; import org.springframework.data.redis.connection.RedisConnectionFactory; @@ -24,288 +14,291 @@ import org.springframework.data.redis.core.ScanOptions; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.types.RedisClientInfo; -import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.*; import static org.assertj.core.api.Assertions.assertThat; -public class RedissonClusterConnectionTest { - - static RedissonClient redisson; - static RedissonClusterConnection connection; - static ClusterProcesses process; - - @BeforeClass - public static void before() throws FailedToStartRedisException, IOException, InterruptedException { - RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave(); - - - ClusterRunner clusterRunner = new ClusterRunner() - .addNode(master1, slave1) - .addNode(master2, slave2) - .addNode(master3, slave3); - process = clusterRunner.run(); - - Config config = new Config(); - config.useClusterServers() - .setReadMode(ReadMode.MASTER_SLAVE) - .setSubscriptionMode(SubscriptionMode.SLAVE) - .setLoadBalancer(new RandomLoadBalancer()) - .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); - - redisson = Redisson.create(config); - RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson); - connection = (RedissonClusterConnection) factory.getConnection(); - } - - @AfterClass - public static void after() { - process.shutdown(); - redisson.shutdown(); - } +public class RedissonClusterConnectionTest extends BaseTest { @Test public void testRandomKey() { - StringRedisTemplate redisTemplate = new StringRedisTemplate(); - redisTemplate.setConnectionFactory(new RedissonConnectionFactory(redisson)); - redisTemplate.afterPropertiesSet(); - - for (int i = 0; i < 10; i++) { - redisTemplate.opsForValue().set("i" + i, "i" + i); - } - - for (RedisClusterNode clusterNode : redisTemplate.getConnectionFactory().getClusterConnection().clusterGetNodes()) { - String key = redisTemplate.opsForCluster().randomKey(clusterNode); - assertThat(key).isNotNull(); - } + testInCluster(connection -> { + RedissonClient redisson = (RedissonClient) connection.getNativeConnection(); + StringRedisTemplate redisTemplate = new StringRedisTemplate(); + redisTemplate.setConnectionFactory(new RedissonConnectionFactory(redisson)); + redisTemplate.afterPropertiesSet(); + + for (int i = 0; i < 10; i++) { + redisTemplate.opsForValue().set("i" + i, "i" + i); + } + + for (RedisClusterNode clusterNode : redisTemplate.getConnectionFactory().getClusterConnection().clusterGetNodes()) { + String key = redisTemplate.opsForCluster().randomKey(clusterNode); + assertThat(key).isNotNull(); + } + }); } @Test public void testDel() { - List keys = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - byte[] key = ("test" + i).getBytes(); - keys.add(key); - connection.set(key, ("test" + i).getBytes()); - } - assertThat(connection.del(keys.toArray(new byte[0][]))).isEqualTo(10); + testInCluster(connection -> { + List keys = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + byte[] key = ("test" + i).getBytes(); + keys.add(key); + connection.set(key, ("test" + i).getBytes()); + } + assertThat(connection.del(keys.toArray(new byte[0][]))).isEqualTo(10); + }); } @Test public void testScan() { - Map map = new HashMap<>(); - for (int i = 0; i < 10000; i++) { - map.put(RandomString.make(32).getBytes(), RandomString.make(32).getBytes(StandardCharsets.UTF_8)); - } - connection.mSet(map); - - Cursor b = connection.scan(ScanOptions.scanOptions().build()); - Set sett = new HashSet<>(); - int counter = 0; - while (b.hasNext()) { - byte[] tt = b.next(); - sett.add(new String(tt)); - counter++; - } - assertThat(sett.size()).isEqualTo(map.size()); - assertThat(counter).isEqualTo(map.size()); + testInCluster(connection -> { + Map map = new HashMap<>(); + for (int i = 0; i < 10000; i++) { + map.put(RandomString.make(32).getBytes(), RandomString.make(32).getBytes(StandardCharsets.UTF_8)); + } + connection.mSet(map); + + Cursor b = connection.scan(ScanOptions.scanOptions().build()); + Set sett = new HashSet<>(); + int counter = 0; + while (b.hasNext()) { + byte[] tt = b.next(); + sett.add(new String(tt)); + counter++; + } + assertThat(sett.size()).isEqualTo(map.size()); + assertThat(counter).isEqualTo(map.size()); + }); } @Test public void testMSet() { - Map map = new HashMap<>(); - for (int i = 0; i < 10; i++) { - map.put(("test" + i).getBytes(), ("test" + i*100).getBytes()); - } - connection.mSet(map); - for (Map.Entry entry : map.entrySet()) { - assertThat(connection.get(entry.getKey())).isEqualTo(entry.getValue()); - } + testInCluster(connection -> { + Map map = new HashMap<>(); + for (int i = 0; i < 10; i++) { + map.put(("test" + i).getBytes(), ("test" + i*100).getBytes()); + } + connection.mSet(map); + for (Map.Entry entry : map.entrySet()) { + assertThat(connection.get(entry.getKey())).isEqualTo(entry.getValue()); + } + }); } @Test public void testMGet() { - Map map = new HashMap<>(); - for (int i = 0; i < 10; i++) { - map.put(("test" + i).getBytes(), ("test" + i*100).getBytes()); - } - connection.mSet(map); - List r = connection.mGet(map.keySet().toArray(new byte[0][])); - assertThat(r).containsExactly(map.values().toArray(new byte[0][])); + testInCluster(connection -> { + Map map = new HashMap<>(); + for (int i = 0; i < 10; i++) { + map.put(("test" + i).getBytes(), ("test" + i*100).getBytes()); + } + connection.mSet(map); + List r = connection.mGet(map.keySet().toArray(new byte[0][])); + assertThat(r).containsExactly(map.values().toArray(new byte[0][])); + }); } @Test public void testClusterGetNodes() { - Iterable nodes = connection.clusterGetNodes(); - assertThat(nodes).hasSize(6); - for (RedisClusterNode redisClusterNode : nodes) { - assertThat(redisClusterNode.getLinkState()).isNotNull(); - assertThat(redisClusterNode.getFlags()).isNotEmpty(); - assertThat(redisClusterNode.getHost()).isNotNull(); - assertThat(redisClusterNode.getPort()).isNotNull(); - assertThat(redisClusterNode.getId()).isNotNull(); - assertThat(redisClusterNode.getType()).isNotNull(); - if (redisClusterNode.getType() == NodeType.MASTER) { - assertThat(redisClusterNode.getSlotRange().getSlots()).isNotEmpty(); - } else { - assertThat(redisClusterNode.getMasterId()).isNotNull(); + testInCluster(connection -> { + Iterable nodes = connection.clusterGetNodes(); + assertThat(nodes).hasSize(6); + for (RedisClusterNode redisClusterNode : nodes) { + assertThat(redisClusterNode.getLinkState()).isNotNull(); + assertThat(redisClusterNode.getFlags()).isNotEmpty(); + assertThat(redisClusterNode.getHost()).isNotNull(); + assertThat(redisClusterNode.getPort()).isNotNull(); + assertThat(redisClusterNode.getId()).isNotNull(); + assertThat(redisClusterNode.getType()).isNotNull(); + if (redisClusterNode.getType() == NodeType.MASTER) { + assertThat(redisClusterNode.getSlotRange().getSlots()).isNotEmpty(); + } else { + assertThat(redisClusterNode.getMasterId()).isNotNull(); + } } - } + }); } @Test public void testClusterGetNodesMaster() { - Iterable nodes = connection.clusterGetNodes(); - for (RedisClusterNode redisClusterNode : nodes) { - if (redisClusterNode.getType() == NodeType.MASTER) { - Collection slaves = connection.clusterGetReplicas(redisClusterNode); - assertThat(slaves).hasSize(1); + testInCluster(connection -> { + Iterable nodes = connection.clusterGetNodes(); + for (RedisClusterNode redisClusterNode : nodes) { + if (redisClusterNode.getType() == NodeType.MASTER) { + Collection slaves = connection.clusterGetReplicas(redisClusterNode); + assertThat(slaves).hasSize(1); + } } - } + }); } @Test public void testClusterGetMasterSlaveMap() { - Map> map = connection.clusterGetMasterReplicaMap(); - assertThat(map).hasSize(3); - for (Collection slaves : map.values()) { - assertThat(slaves).hasSize(1); - } + testInCluster(connection -> { + Map> map = connection.clusterGetMasterReplicaMap(); + assertThat(map).hasSize(3); + for (Collection slaves : map.values()) { + assertThat(slaves).hasSize(1); + } + }); } @Test public void testClusterGetSlotForKey() { - Integer slot = connection.clusterGetSlotForKey("123".getBytes()); - assertThat(slot).isNotNull(); + testInCluster(connection -> { + Integer slot = connection.clusterGetSlotForKey("123".getBytes()); + assertThat(slot).isNotNull(); + }); } @Test public void testClusterGetNodeForSlot() { - RedisClusterNode node1 = connection.clusterGetNodeForSlot(1); - RedisClusterNode node2 = connection.clusterGetNodeForSlot(16000); - assertThat(node1.getId()).isNotEqualTo(node2.getId()); + testInCluster(connection -> { + RedisClusterNode node1 = connection.clusterGetNodeForSlot(1); + RedisClusterNode node2 = connection.clusterGetNodeForSlot(16000); + assertThat(node1.getId()).isNotEqualTo(node2.getId()); + }); } @Test public void testClusterGetNodeForKey() { - RedisClusterNode node = connection.clusterGetNodeForKey("123".getBytes()); - assertThat(node).isNotNull(); + testInCluster(connection -> { + RedisClusterNode node = connection.clusterGetNodeForKey("123".getBytes()); + assertThat(node).isNotNull(); + }); } @Test public void testClusterGetClusterInfo() { - ClusterInfo info = connection.clusterGetClusterInfo(); - assertThat(info.getSlotsFail()).isEqualTo(0); - assertThat(info.getSlotsOk()).isEqualTo(MasterSlaveConnectionManager.MAX_SLOT); - assertThat(info.getSlotsAssigned()).isEqualTo(MasterSlaveConnectionManager.MAX_SLOT); + testInCluster(connection -> { + ClusterInfo info = connection.clusterGetClusterInfo(); + assertThat(info.getSlotsFail()).isEqualTo(0); + assertThat(info.getSlotsOk()).isEqualTo(MasterSlaveConnectionManager.MAX_SLOT); + assertThat(info.getSlotsAssigned()).isEqualTo(MasterSlaveConnectionManager.MAX_SLOT); + }); } @Test public void testClusterAddRemoveSlots() { - RedisClusterNode master = getFirstMaster(); - Integer slot = master.getSlotRange().getSlots().iterator().next(); - connection.clusterDeleteSlots(master, slot); - connection.clusterAddSlots(master, slot); + testInCluster(connection -> { + RedisClusterNode master = getFirstMaster(connection); + Integer slot = master.getSlotRange().getSlots().iterator().next(); + connection.clusterDeleteSlots(master, slot); + connection.clusterAddSlots(master, slot); + }); } @Test public void testClusterCountKeysInSlot() { - Long t = connection.clusterCountKeysInSlot(1); - assertThat(t).isZero(); - } - - @Test - public void testClusterMeetForget() { - RedisClusterNode master = getFirstMaster(); - connection.clusterForget(master); - connection.clusterMeet(master); + testInCluster(connection -> { + Long t = connection.clusterCountKeysInSlot(1); + assertThat(t).isZero(); + }); } @Test public void testClusterGetKeysInSlot() { - connection.flushAll(); - List keys = connection.clusterGetKeysInSlot(12, 10); - assertThat(keys).isEmpty(); + testInCluster(connection -> { + connection.flushAll(); + List keys = connection.clusterGetKeysInSlot(12, 10); + assertThat(keys).isEmpty(); + }); } @Test public void testClusterPing() { - RedisClusterNode master = getFirstMaster(); - String res = connection.ping(master); - assertThat(res).isEqualTo("PONG"); + testInCluster(connection -> { + RedisClusterNode master = getFirstMaster(connection); + String res = connection.ping(master); + assertThat(res).isEqualTo("PONG"); + }); } @Test public void testDbSize() { - connection.flushAll(); - RedisClusterNode master = getFirstMaster(); - Long size = connection.dbSize(master); - assertThat(size).isZero(); + testInCluster(connection -> { + connection.flushAll(); + RedisClusterNode master = getFirstMaster(connection); + Long size = connection.dbSize(master); + assertThat(size).isZero(); + }); } @Test public void testInfo() { - RedisClusterNode master = getFirstMaster(); - Properties info = connection.info(master); - assertThat(info.size()).isGreaterThan(10); + testInCluster(connection -> { + RedisClusterNode master = getFirstMaster(connection); + Properties info = connection.info(master); + assertThat(info.size()).isGreaterThan(10); + }); } @Test public void testDelPipeline() { - byte[] k = "key".getBytes(); - byte[] v = "val".getBytes(); - connection.set(k, v); - - connection.openPipeline(); - connection.get(k); - connection.del(k); - List results = connection.closePipeline(); - byte[] val = (byte[])results.get(0); - assertThat(val).isEqualTo(v); - Long res = (Long) results.get(1); - assertThat(res).isEqualTo(1); + testInCluster(connection -> { + byte[] k = "key".getBytes(); + byte[] v = "val".getBytes(); + connection.set(k, v); + + connection.openPipeline(); + connection.get(k); + connection.del(k); + List results = connection.closePipeline(); + byte[] val = (byte[])results.get(0); + assertThat(val).isEqualTo(v); + Long res = (Long) results.get(1); + assertThat(res).isEqualTo(1); + }); } @Test public void testResetConfigStats() { - RedisClusterNode master = getFirstMaster(); - connection.resetConfigStats(master); + testInCluster(connection -> { + RedisClusterNode master = getFirstMaster(connection); + connection.resetConfigStats(master); + }); } @Test public void testTime() { - RedisClusterNode master = getFirstMaster(); - Long time = connection.time(master); - assertThat(time).isGreaterThan(1000); + testInCluster(connection -> { + RedisClusterNode master = getFirstMaster(connection); + Long time = connection.time(master); + assertThat(time).isGreaterThan(1000); + }); } @Test public void testGetClientList() { - RedisClusterNode master = getFirstMaster(); - List list = connection.getClientList(master); - assertThat(list.size()).isGreaterThan(10); + testInCluster(connection -> { + RedisClusterNode master = getFirstMaster(connection); + List list = connection.getClientList(master); + assertThat(list.size()).isGreaterThan(10); + }); } @Test public void testSetConfig() { - RedisClusterNode master = getFirstMaster(); - connection.setConfig(master, "timeout", "10"); + testInCluster(connection -> { + RedisClusterNode master = getFirstMaster(connection); + connection.setConfig(master, "timeout", "10"); + }); } @Test public void testGetConfig() { - RedisClusterNode master = getFirstMaster(); - Properties config = connection.getConfig(master, "*"); - assertThat(config.size()).isGreaterThan(20); + testInCluster(connection -> { + RedisClusterNode master = getFirstMaster(connection); + Properties config = connection.getConfig(master, "*"); + assertThat(config.size()).isGreaterThan(20); + }); } - protected RedisClusterNode getFirstMaster() { + protected RedisClusterNode getFirstMaster(RedissonClusterConnection connection) { Map> map = connection.clusterGetMasterReplicaMap(); RedisClusterNode master = map.keySet().iterator().next(); return master; @@ -313,9 +306,12 @@ public class RedissonClusterConnectionTest { @Test public void testConnectionFactoryReturnsClusterConnection() { - RedisConnectionFactory connectionFactory = new RedissonConnectionFactory(redisson); + testInCluster(connection -> { + RedissonClient redisson = (RedissonClient) connection.getNativeConnection(); + RedisConnectionFactory connectionFactory = new RedissonConnectionFactory(redisson); - assertThat(connectionFactory.getConnection()).isInstanceOf(RedissonClusterConnection.class); + assertThat(connectionFactory.getConnection()).isInstanceOf(RedissonClusterConnection.class); + }); } } diff --git a/redisson-spring-data/redisson-spring-data-32/src/test/java/org/redisson/spring/data/connection/RedissonReactiveClusterKeyCommandsTest.java b/redisson-spring-data/redisson-spring-data-32/src/test/java/org/redisson/spring/data/connection/RedissonReactiveClusterKeyCommandsTest.java index c77beb3ef..21d2a5e19 100644 --- a/redisson-spring-data/redisson-spring-data-32/src/test/java/org/redisson/spring/data/connection/RedissonReactiveClusterKeyCommandsTest.java +++ b/redisson-spring-data/redisson-spring-data-32/src/test/java/org/redisson/spring/data/connection/RedissonReactiveClusterKeyCommandsTest.java @@ -17,19 +17,21 @@ import org.redisson.config.SubscriptionMode; import org.redisson.connection.balancer.RandomLoadBalancer; import org.redisson.reactive.CommandReactiveService; import org.springframework.data.redis.RedisSystemException; +import org.springframework.data.redis.connection.ReactiveRedisClusterConnection; import java.io.IOException; import java.nio.ByteBuffer; import java.time.Duration; import java.util.Arrays; +import java.util.function.Consumer; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT; @RunWith(Parameterized.class) -public class RedissonReactiveClusterKeyCommandsTest { +public class RedissonReactiveClusterKeyCommandsTest extends BaseTest { @Parameterized.Parameters(name= "{index} - same slot = {0}; has ttl = {1}") public static Iterable data() { @@ -47,109 +49,78 @@ public class RedissonReactiveClusterKeyCommandsTest { @Parameterized.Parameter(1) public boolean hasTtl; - static RedissonClient redisson; - static RedissonReactiveRedisClusterConnection connection; - static ClusterProcesses process; - ByteBuffer originalKey = ByteBuffer.wrap("key".getBytes()); ByteBuffer newKey = ByteBuffer.wrap("unset".getBytes()); ByteBuffer value = ByteBuffer.wrap("value".getBytes()); - @BeforeClass - public static void before() throws FailedToStartRedisException, IOException, InterruptedException { - RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave(); - RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave(); - - - ClusterRunner clusterRunner = new ClusterRunner() - .addNode(master1, slave1) - .addNode(master2, slave2) - .addNode(master3, slave3); - process = clusterRunner.run(); - - Config config = new Config(); - config.useClusterServers() - .setSubscriptionMode(SubscriptionMode.SLAVE) - .setLoadBalancer(new RandomLoadBalancer()) - .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); - - redisson = Redisson.create(config); - connection = new RedissonReactiveRedisClusterConnection(((RedissonReactive)redisson.reactive()).getCommandExecutor()); - } - - @AfterClass - public static void after() { - process.shutdown(); - redisson.shutdown(); - } - - @After - public void cleanup() { - connection.keyCommands().del(originalKey) - .and(connection.keyCommands().del(newKey)) - .block(); + private void testInClusterReactive(Consumer redissonCallback) { + testInCluster(c -> { + RedissonClient redisson = (RedissonClient) c.getNativeConnection(); + RedissonReactiveRedisClusterConnection connection = new RedissonReactiveRedisClusterConnection(((RedissonReactive) redisson.reactive()).getCommandExecutor()); + redissonCallback.accept(connection); + }); } @Test public void testRename() { - connection.stringCommands().set(originalKey, value).block(); + testInClusterReactive(connection -> { + connection.stringCommands().set(originalKey, value).block(); - if (hasTtl) { - connection.keyCommands().expire(originalKey, Duration.ofSeconds(1000)).block(); - } + if (hasTtl) { + connection.keyCommands().expire(originalKey, Duration.ofSeconds(1000)).block(); + } - Integer originalSlot = getSlotForKey(originalKey); - newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot)); + Integer originalSlot = getSlotForKey(originalKey, (RedissonReactiveRedisClusterConnection) connection); + newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot), connection); - Boolean response = connection.keyCommands().rename(originalKey, newKey).block(); + Boolean response = connection.keyCommands().rename(originalKey, newKey).block(); - assertThat(response).isTrue(); + assertThat(response).isTrue(); - final ByteBuffer newKeyValue = connection.stringCommands().get(newKey).block(); - assertThat(newKeyValue).isEqualTo(value); - if (hasTtl) { - assertThat(connection.keyCommands().ttl(newKey).block()).isGreaterThan(0); - } else { - assertThat(connection.keyCommands().ttl(newKey).block()).isEqualTo(-1); - } + final ByteBuffer newKeyValue = connection.stringCommands().get(newKey).block(); + assertThat(newKeyValue).isEqualTo(value); + if (hasTtl) { + assertThat(connection.keyCommands().ttl(newKey).block()).isGreaterThan(0); + } else { + assertThat(connection.keyCommands().ttl(newKey).block()).isEqualTo(-1); + } + }); } @Test public void testRename_keyNotExist() { - Integer originalSlot = getSlotForKey(originalKey); - newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot)); + testInClusterReactive(connection -> { + Integer originalSlot = getSlotForKey(originalKey, (RedissonReactiveRedisClusterConnection) connection); + newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot), connection); - if (sameSlot) { - // This is a quirk of the implementation - since same-slot renames use the non-cluster version, - // the result is a Redis error. This behavior matches other spring-data-redis implementations - assertThatThrownBy(() -> connection.keyCommands().rename(originalKey, newKey).block()) - .isInstanceOf(RedisSystemException.class); + if (sameSlot) { + // This is a quirk of the implementation - since same-slot renames use the non-cluster version, + // the result is a Redis error. This behavior matches other spring-data-redis implementations + assertThatThrownBy(() -> connection.keyCommands().rename(originalKey, newKey).block()) + .isInstanceOf(RedisSystemException.class); - } else { - Boolean response = connection.keyCommands().rename(originalKey, newKey).block(); + } else { + Boolean response = connection.keyCommands().rename(originalKey, newKey).block(); - assertThat(response).isTrue(); + assertThat(response).isTrue(); - final ByteBuffer newKeyValue = connection.stringCommands().get(newKey).block(); - assertThat(newKeyValue).isEqualTo(null); - } + final ByteBuffer newKeyValue = connection.stringCommands().get(newKey).block(); + assertThat(newKeyValue).isEqualTo(null); + } + }); } - protected ByteBuffer getNewKeyForSlot(String originalKey, Integer targetSlot) { + protected ByteBuffer getNewKeyForSlot(String originalKey, Integer targetSlot, ReactiveRedisClusterConnection connection) { int counter = 0; ByteBuffer newKey = ByteBuffer.wrap((originalKey + counter).getBytes()); - Integer newKeySlot = getSlotForKey(newKey); + Integer newKeySlot = getSlotForKey(newKey, (RedissonReactiveRedisClusterConnection) connection); while(!newKeySlot.equals(targetSlot)) { counter++; newKey = ByteBuffer.wrap((originalKey + counter).getBytes()); - newKeySlot = getSlotForKey(newKey); + newKeySlot = getSlotForKey(newKey, (RedissonReactiveRedisClusterConnection) connection); } return newKey; @@ -157,36 +128,38 @@ public class RedissonReactiveClusterKeyCommandsTest { @Test public void testRenameNX() { - connection.stringCommands().set(originalKey, value).block(); - if (hasTtl) { - connection.keyCommands().expire(originalKey, Duration.ofSeconds(1000)).block(); - } + testInClusterReactive(connection -> { + connection.stringCommands().set(originalKey, value).block(); + if (hasTtl) { + connection.keyCommands().expire(originalKey, Duration.ofSeconds(1000)).block(); + } - Integer originalSlot = getSlotForKey(originalKey); - newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot)); + Integer originalSlot = getSlotForKey(originalKey, (RedissonReactiveRedisClusterConnection) connection); + newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot), connection); - Boolean result = connection.keyCommands().renameNX(originalKey, newKey).block(); + Boolean result = connection.keyCommands().renameNX(originalKey, newKey).block(); - assertThat(result).isTrue(); - assertThat(connection.stringCommands().get(newKey).block()).isEqualTo(value); - if (hasTtl) { - assertThat(connection.keyCommands().ttl(newKey).block()).isGreaterThan(0); - } else { - assertThat(connection.keyCommands().ttl(newKey).block()).isEqualTo(-1); - } + assertThat(result).isTrue(); + assertThat(connection.stringCommands().get(newKey).block()).isEqualTo(value); + if (hasTtl) { + assertThat(connection.keyCommands().ttl(newKey).block()).isGreaterThan(0); + } else { + assertThat(connection.keyCommands().ttl(newKey).block()).isEqualTo(-1); + } - connection.stringCommands().set(originalKey, value).block(); + connection.stringCommands().set(originalKey, value).block(); - result = connection.keyCommands().renameNX(originalKey, newKey).block(); + result = connection.keyCommands().renameNX(originalKey, newKey).block(); - assertThat(result).isFalse(); + assertThat(result).isFalse(); + }); } private Integer getTargetSlot(Integer originalSlot) { return sameSlot ? originalSlot : MAX_SLOT - originalSlot - 1; } - private Integer getSlotForKey(ByteBuffer key) { + private Integer getSlotForKey(ByteBuffer key, RedissonReactiveRedisClusterConnection connection) { return (Integer) connection.read(null, StringCodec.INSTANCE, RedisCommands.KEYSLOT, key.array()).block(); } diff --git a/redisson-spring-data/redisson-spring-data-32/src/test/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommandsTest.java b/redisson-spring-data/redisson-spring-data-32/src/test/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommandsTest.java new file mode 100644 index 000000000..df7e9aef2 --- /dev/null +++ b/redisson-spring-data/redisson-spring-data-32/src/test/java/org/redisson/spring/data/connection/RedissonReactiveKeyCommandsTest.java @@ -0,0 +1,21 @@ +package org.redisson.spring.data.connection; + +import org.junit.Test; +import org.springframework.data.redis.core.ReactiveStringRedisTemplate; + +import java.time.Duration; + +import static org.assertj.core.api.Assertions.assertThat; + +public class RedissonReactiveKeyCommandsTest extends BaseConnectionTest { + + @Test + public void testExpiration() { + RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson); + ReactiveStringRedisTemplate t = new ReactiveStringRedisTemplate(factory); + t.opsForValue().set("123", "4343").block(); + t.expire("123", Duration.ofMillis(1001)).block(); + assertThat(t.getExpire("123").block().toMillis()).isBetween(900L, 1000L); + } + +}