Merge branch 'master' of github.com:redisson/redisson

pull/3140/head
Nikita Koksharov 4 years ago
commit c4369d73bf

@ -39,6 +39,7 @@ import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.StringMapDataDecoder;
import org.redisson.connection.MasterSlaveEntry;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.RedisClusterConnection;
import org.springframework.data.redis.connection.RedisClusterNode;
@ -377,4 +378,66 @@ public class RedissonClusterConnection extends RedissonConnection implements Red
return CONVERTER.convert(list.toArray(new String[list.size()]));
}
@Override
public void rename(byte[] oldName, byte[] newName) {
if (isPipelined()) {
throw new InvalidDataAccessResourceUsageException("Clustered rename is not supported in a pipeline");
}
if (redisson.getConnectionManager().calcSlot(oldName) == redisson.getConnectionManager().calcSlot(newName)) {
super.rename(oldName, newName);
return;
}
final byte[] value = dump(oldName);
if (null != value) {
final Long sourceTtlInSeconds = ttl(oldName);
final long ttlInMilliseconds;
if (null != sourceTtlInSeconds && sourceTtlInSeconds > 0) {
ttlInMilliseconds = sourceTtlInSeconds * 1000;
} else {
ttlInMilliseconds = 0;
}
restore(newName, ttlInMilliseconds, value);
del(oldName);
}
}
@Override
public Boolean renameNX(byte[] oldName, byte[] newName) {
if (isPipelined()) {
throw new InvalidDataAccessResourceUsageException("Clustered rename is not supported in a pipeline");
}
if (redisson.getConnectionManager().calcSlot(oldName) == redisson.getConnectionManager().calcSlot(newName)) {
return super.renameNX(oldName, newName);
}
final byte[] value = dump(oldName);
if (null != value && !exists(newName)) {
final Long sourceTtlInSeconds = ttl(oldName);
final long ttlInMilliseconds;
if (null != sourceTtlInSeconds && sourceTtlInSeconds > 0) {
ttlInMilliseconds = sourceTtlInSeconds * 1000;
} else {
ttlInMilliseconds = 0;
}
restore(newName, ttlInMilliseconds, value);
del(oldName);
return true;
}
return false;
}
}

@ -95,6 +95,9 @@ public class RedissonConnectionFactory implements RedisConnectionFactory, Initia
@Override
public RedisConnection getConnection() {
if (redisson.getConfig().isClusterConfig()) {
return new RedissonClusterConnection(redisson);
}
return new RedissonConnection(redisson);
}

@ -0,0 +1,166 @@
package org.redisson.spring.data.connection;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.redisson.ClusterRunner;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SubscriptionMode;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import java.io.IOException;
import java.util.Arrays;
import static org.assertj.core.api.Assertions.*;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT;
@RunWith(Parameterized.class)
public class RedissonClusterConnectionRenameTest {
@Parameterized.Parameters(name= "{index} - same slot = {0}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] {
{false},
{true}
});
}
@Parameterized.Parameter(0)
public boolean sameSlot;
static RedissonClient redisson;
static RedissonClusterConnection connection;
static ClusterProcesses process;
byte[] originalKey = "key".getBytes();
byte[] newKey = "unset".getBytes();
byte[] value = "value".getBytes();
@BeforeClass
public static void before() throws FailedToStartRedisException, IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setSubscriptionMode(SubscriptionMode.SLAVE)
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
redisson = Redisson.create(config);
connection = new RedissonClusterConnection(redisson);
}
@AfterClass
public static void after() {
process.shutdown();
redisson.shutdown();
}
@After
public void cleanup() {
connection.del(originalKey);
connection.del(newKey);
}
@Test
public void testRename() {
connection.set(originalKey, value);
connection.expire(originalKey, 1000);
Integer originalSlot = connection.clusterGetSlotForKey(originalKey);
newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot));
connection.rename(originalKey, newKey);
assertThat(connection.get(newKey)).isEqualTo(value);
assertThat(connection.ttl(newKey)).isGreaterThan(0);
}
@Test
public void testRename_pipeline() {
connection.set(originalKey, value);
Integer originalSlot = connection.clusterGetSlotForKey(originalKey);
newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot));
connection.openPipeline();
assertThatThrownBy(() -> connection.rename(originalKey, newKey)).isInstanceOf(InvalidDataAccessResourceUsageException.class);
connection.closePipeline();
}
protected byte[] getNewKeyForSlot(byte[] originalKey, Integer targetSlot) {
int counter = 0;
byte[] newKey = (new String(originalKey) + counter).getBytes();
Integer newKeySlot = connection.clusterGetSlotForKey(newKey);
while(!newKeySlot.equals(targetSlot)) {
counter++;
newKey = (new String(originalKey) + counter).getBytes();
newKeySlot = connection.clusterGetSlotForKey(newKey);
}
return newKey;
}
@Test
public void testRenameNX() {
connection.set(originalKey, value);
connection.expire(originalKey, 1000);
Integer originalSlot = connection.clusterGetSlotForKey(originalKey);
newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot));
Boolean result = connection.renameNX(originalKey, newKey);
assertThat(connection.get(newKey)).isEqualTo(value);
assertThat(connection.ttl(newKey)).isGreaterThan(0);
assertThat(result).isTrue();
connection.set(originalKey, value);
result = connection.renameNX(originalKey, newKey);
assertThat(result).isFalse();
}
@Test
public void testRenameNX_pipeline() {
connection.set(originalKey, value);
Integer originalSlot = connection.clusterGetSlotForKey(originalKey);
newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot));
connection.openPipeline();
assertThatThrownBy(() -> connection.renameNX(originalKey, newKey)).isInstanceOf(InvalidDataAccessResourceUsageException.class);
connection.closePipeline();
}
private Integer getTargetSlot(Integer originalSlot) {
return sameSlot ? originalSlot : MAX_SLOT - originalSlot - 1;
}
}

@ -1,32 +1,34 @@
package org.redisson.spring.data.connection;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.assertj.core.api.Assertions.*;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.redisson.ClusterRunner;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.Redisson;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SubscriptionMode;
import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.RedisClusterNode;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisNode.NodeType;
import org.springframework.data.redis.core.types.RedisClientInfo;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.assertj.core.api.Assertions.*;
import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT;
public class RedissonClusterConnectionTest {
static RedissonClient redisson;
@ -218,5 +220,12 @@ public class RedissonClusterConnectionTest {
RedisClusterNode master = map.keySet().iterator().next();
return master;
}
@Test
public void testConnectionFactoryReturnsClusterConnection() {
RedisConnectionFactory connectionFactory = new RedissonConnectionFactory(redisson);
assertThat(connectionFactory.getConnection()).isInstanceOf(RedissonClusterConnection.class);
}
}

@ -39,6 +39,7 @@ import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.StringMapDataDecoder;
import org.redisson.connection.MasterSlaveEntry;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.RedisClusterConnection;
import org.springframework.data.redis.connection.RedisClusterNode;
@ -377,4 +378,66 @@ public class RedissonClusterConnection extends RedissonConnection implements Red
return CONVERTER.convert(list.toArray(new String[list.size()]));
}
@Override
public void rename(byte[] oldName, byte[] newName) {
if (isPipelined()) {
throw new InvalidDataAccessResourceUsageException("Clustered rename is not supported in a pipeline");
}
if (redisson.getConnectionManager().calcSlot(oldName) == redisson.getConnectionManager().calcSlot(newName)) {
super.rename(oldName, newName);
return;
}
final byte[] value = dump(oldName);
if (null != value) {
final Long sourceTtlInSeconds = ttl(oldName);
final long ttlInMilliseconds;
if (null != sourceTtlInSeconds && sourceTtlInSeconds > 0) {
ttlInMilliseconds = sourceTtlInSeconds * 1000;
} else {
ttlInMilliseconds = 0;
}
restore(newName, ttlInMilliseconds, value);
del(oldName);
}
}
@Override
public Boolean renameNX(byte[] oldName, byte[] newName) {
if (isPipelined()) {
throw new InvalidDataAccessResourceUsageException("Clustered rename is not supported in a pipeline");
}
if (redisson.getConnectionManager().calcSlot(oldName) == redisson.getConnectionManager().calcSlot(newName)) {
return super.renameNX(oldName, newName);
}
final byte[] value = dump(oldName);
if (null != value && !exists(newName)) {
final Long sourceTtlInSeconds = ttl(oldName);
final long ttlInMilliseconds;
if (null != sourceTtlInSeconds && sourceTtlInSeconds > 0) {
ttlInMilliseconds = sourceTtlInSeconds * 1000;
} else {
ttlInMilliseconds = 0;
}
restore(newName, ttlInMilliseconds, value);
del(oldName);
return true;
}
return false;
}
}

@ -102,6 +102,9 @@ public class RedissonConnectionFactory implements RedisConnectionFactory,
@Override
public RedisConnection getConnection() {
if (redisson.getConfig().isClusterConfig()) {
return new RedissonClusterConnection(redisson);
}
return new RedissonConnection(redisson);
}

@ -0,0 +1,166 @@
package org.redisson.spring.data.connection;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.redisson.ClusterRunner;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SubscriptionMode;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import java.io.IOException;
import java.util.Arrays;
import static org.assertj.core.api.Assertions.*;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT;
@RunWith(Parameterized.class)
public class RedissonClusterConnectionRenameTest {
@Parameterized.Parameters(name= "{index} - same slot = {0}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] {
{false},
{true}
});
}
@Parameterized.Parameter(0)
public boolean sameSlot;
static RedissonClient redisson;
static RedissonClusterConnection connection;
static ClusterProcesses process;
byte[] originalKey = "key".getBytes();
byte[] newKey = "unset".getBytes();
byte[] value = "value".getBytes();
@BeforeClass
public static void before() throws FailedToStartRedisException, IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setSubscriptionMode(SubscriptionMode.SLAVE)
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
redisson = Redisson.create(config);
connection = new RedissonClusterConnection(redisson);
}
@AfterClass
public static void after() {
process.shutdown();
redisson.shutdown();
}
@After
public void cleanup() {
connection.del(originalKey);
connection.del(newKey);
}
@Test
public void testRename() {
connection.set(originalKey, value);
connection.expire(originalKey, 1000);
Integer originalSlot = connection.clusterGetSlotForKey(originalKey);
newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot));
connection.rename(originalKey, newKey);
assertThat(connection.get(newKey)).isEqualTo(value);
assertThat(connection.ttl(newKey)).isGreaterThan(0);
}
@Test
public void testRename_pipeline() {
connection.set(originalKey, value);
Integer originalSlot = connection.clusterGetSlotForKey(originalKey);
newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot));
connection.openPipeline();
assertThatThrownBy(() -> connection.rename(originalKey, newKey)).isInstanceOf(InvalidDataAccessResourceUsageException.class);
connection.closePipeline();
}
protected byte[] getNewKeyForSlot(byte[] originalKey, Integer targetSlot) {
int counter = 0;
byte[] newKey = (new String(originalKey) + counter).getBytes();
Integer newKeySlot = connection.clusterGetSlotForKey(newKey);
while(!newKeySlot.equals(targetSlot)) {
counter++;
newKey = (new String(originalKey) + counter).getBytes();
newKeySlot = connection.clusterGetSlotForKey(newKey);
}
return newKey;
}
@Test
public void testRenameNX() {
connection.set(originalKey, value);
connection.expire(originalKey, 1000);
Integer originalSlot = connection.clusterGetSlotForKey(originalKey);
newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot));
Boolean result = connection.renameNX(originalKey, newKey);
assertThat(connection.get(newKey)).isEqualTo(value);
assertThat(connection.ttl(newKey)).isGreaterThan(0);
assertThat(result).isTrue();
connection.set(originalKey, value);
result = connection.renameNX(originalKey, newKey);
assertThat(result).isFalse();
}
@Test
public void testRenameNX_pipeline() {
connection.set(originalKey, value);
Integer originalSlot = connection.clusterGetSlotForKey(originalKey);
newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot));
connection.openPipeline();
assertThatThrownBy(() -> connection.renameNX(originalKey, newKey)).isInstanceOf(InvalidDataAccessResourceUsageException.class);
connection.closePipeline();
}
private Integer getTargetSlot(Integer originalSlot) {
return sameSlot ? originalSlot : MAX_SLOT - originalSlot - 1;
}
}

@ -1,32 +1,34 @@
package org.redisson.spring.data.connection;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.assertj.core.api.Assertions.*;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.redisson.ClusterRunner;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.Redisson;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SubscriptionMode;
import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.RedisClusterNode;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisNode.NodeType;
import org.springframework.data.redis.core.types.RedisClientInfo;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.assertj.core.api.Assertions.*;
import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT;
public class RedissonClusterConnectionTest {
static RedissonClient redisson;
@ -218,5 +220,12 @@ public class RedissonClusterConnectionTest {
RedisClusterNode master = map.keySet().iterator().next();
return master;
}
@Test
public void testConnectionFactoryReturnsClusterConnection() {
RedisConnectionFactory connectionFactory = new RedissonConnectionFactory(redisson);
assertThat(connectionFactory.getConnection()).isInstanceOf(RedissonClusterConnection.class);
}
}

@ -17,7 +17,6 @@ package org.redisson.spring.data.connection;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@ -39,6 +38,7 @@ import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.StringMapDataDecoder;
import org.redisson.connection.MasterSlaveEntry;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.RedisClusterConnection;
import org.springframework.data.redis.connection.RedisClusterNode;
@ -382,4 +382,66 @@ public class RedissonClusterConnection extends RedissonConnection implements Red
return CONVERTER.convert(list.toArray(new String[list.size()]));
}
@Override
public void rename(byte[] oldName, byte[] newName) {
if (isPipelined()) {
throw new InvalidDataAccessResourceUsageException("Clustered rename is not supported in a pipeline");
}
if (redisson.getConnectionManager().calcSlot(oldName) == redisson.getConnectionManager().calcSlot(newName)) {
super.rename(oldName, newName);
return;
}
final byte[] value = dump(oldName);
if (null != value) {
final Long sourceTtlInSeconds = ttl(oldName);
final long ttlInMilliseconds;
if (null != sourceTtlInSeconds && sourceTtlInSeconds > 0) {
ttlInMilliseconds = sourceTtlInSeconds * 1000;
} else {
ttlInMilliseconds = 0;
}
restore(newName, ttlInMilliseconds, value);
del(oldName);
}
}
@Override
public Boolean renameNX(byte[] oldName, byte[] newName) {
if (isPipelined()) {
throw new InvalidDataAccessResourceUsageException("Clustered rename is not supported in a pipeline");
}
if (redisson.getConnectionManager().calcSlot(oldName) == redisson.getConnectionManager().calcSlot(newName)) {
return super.renameNX(oldName, newName);
}
final byte[] value = dump(oldName);
if (null != value && !exists(newName)) {
final Long sourceTtlInSeconds = ttl(oldName);
final long ttlInMilliseconds;
if (null != sourceTtlInSeconds && sourceTtlInSeconds > 0) {
ttlInMilliseconds = sourceTtlInSeconds * 1000;
} else {
ttlInMilliseconds = 0;
}
restore(newName, ttlInMilliseconds, value);
del(oldName);
return true;
}
return false;
}
}

@ -100,6 +100,9 @@ public class RedissonConnectionFactory implements RedisConnectionFactory,
@Override
public RedisConnection getConnection() {
if (redisson.getConfig().isClusterConfig()) {
return new RedissonClusterConnection(redisson);
}
return new RedissonConnection(redisson);
}

@ -17,8 +17,10 @@ package org.redisson.spring.data.connection;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.redisson.api.RFuture;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.StringCodec;
@ -26,13 +28,18 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.reactive.CommandReactiveExecutor;
import org.springframework.data.redis.connection.ReactiveClusterKeyCommands;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveRedisConnection.BooleanResponse;
import org.springframework.data.redis.connection.RedisClusterNode;
import org.springframework.util.Assert;
import io.netty.util.CharsetUtil;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
/**
*
*
* @author Nikita Koksharov
*
*/
@ -59,4 +66,71 @@ public class RedissonReactiveClusterKeyCommands extends RedissonReactiveKeyComma
return m.map(v -> ByteBuffer.wrap(v));
}
@Override
public Flux<BooleanResponse<RenameCommand>> rename(Publisher<RenameCommand> commands) {
return execute(commands, command -> {
Assert.notNull(command.getKey(), "Key must not be null!");
Assert.notNull(command.getNewName(), "New name must not be null!");
byte[] keyBuf = toByteArray(command.getKey());
byte[] newKeyBuf = toByteArray(command.getNewName());
if (executorService.getConnectionManager().calcSlot(keyBuf) == executorService.getConnectionManager().calcSlot(newKeyBuf)) {
return super.rename(commands);
}
return read(keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.DUMP, keyBuf)
.filter(Objects::nonNull)
.zipWith(
Mono.defer(() -> pTtl(command.getKey())
.filter(Objects::nonNull)
.map(ttl -> Math.max(0, ttl))
.switchIfEmpty(Mono.just(0L))
)
)
.flatMap(valueAndTtl -> {
return write(newKeyBuf, StringCodec.INSTANCE, RedisCommands.RESTORE, newKeyBuf, valueAndTtl.getT2(), valueAndTtl.getT1());
})
.thenReturn(new BooleanResponse<>(command, true))
.doOnSuccess((ignored) -> del(command.getKey()));
});
}
@Override
public Flux<ReactiveRedisConnection.BooleanResponse<RenameCommand>> renameNX(Publisher<RenameCommand> commands) {
return execute(commands, command -> {
Assert.notNull(command.getKey(), "Key must not be null!");
Assert.notNull(command.getNewName(), "New name must not be null!");
byte[] keyBuf = toByteArray(command.getKey());
byte[] newKeyBuf = toByteArray(command.getNewName());
if (executorService.getConnectionManager().calcSlot(keyBuf) == executorService.getConnectionManager().calcSlot(newKeyBuf)) {
return super.renameNX(commands);
}
return exists(command.getNewName())
.zipWith(read(keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.DUMP, keyBuf))
.filter(newKeyExistsAndDump -> !newKeyExistsAndDump.getT1() && Objects.nonNull(newKeyExistsAndDump.getT2()))
.map(Tuple2::getT2)
.zipWhen(value ->
pTtl(command.getKey())
.filter(Objects::nonNull)
.map(ttl -> Math.max(0, ttl))
.switchIfEmpty(Mono.just(0L))
)
.flatMap(valueAndTtl -> write(newKeyBuf, StringCodec.INSTANCE, RedisCommands.RESTORE, newKeyBuf, valueAndTtl.getT2(), valueAndTtl.getT1())
.then(Mono.just(true)))
.switchIfEmpty(Mono.just(false))
.doOnSuccess(didRename -> {
if (didRename) {
del(command.getKey());
}
})
.map(didRename -> new BooleanResponse<>(command, didRename));
});
}
}

@ -0,0 +1,166 @@
package org.redisson.spring.data.connection;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.redisson.ClusterRunner;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SubscriptionMode;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import java.io.IOException;
import java.util.Arrays;
import static org.assertj.core.api.Assertions.*;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT;
@RunWith(Parameterized.class)
public class RedissonClusterConnectionRenameTest {
@Parameterized.Parameters(name= "{index} - same slot = {0}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] {
{false},
{true}
});
}
@Parameterized.Parameter(0)
public boolean sameSlot;
static RedissonClient redisson;
static RedissonClusterConnection connection;
static ClusterProcesses process;
byte[] originalKey = "key".getBytes();
byte[] newKey = "unset".getBytes();
byte[] value = "value".getBytes();
@BeforeClass
public static void before() throws FailedToStartRedisException, IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setSubscriptionMode(SubscriptionMode.SLAVE)
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
redisson = Redisson.create(config);
connection = new RedissonClusterConnection(redisson);
}
@AfterClass
public static void after() {
process.shutdown();
redisson.shutdown();
}
@After
public void cleanup() {
connection.del(originalKey);
connection.del(newKey);
}
@Test
public void testRename() {
connection.set(originalKey, value);
connection.expire(originalKey, 1000);
Integer originalSlot = connection.clusterGetSlotForKey(originalKey);
newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot));
connection.rename(originalKey, newKey);
assertThat(connection.get(newKey)).isEqualTo(value);
assertThat(connection.ttl(newKey)).isGreaterThan(0);
}
@Test
public void testRename_pipeline() {
connection.set(originalKey, value);
Integer originalSlot = connection.clusterGetSlotForKey(originalKey);
newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot));
connection.openPipeline();
assertThatThrownBy(() -> connection.rename(originalKey, newKey)).isInstanceOf(InvalidDataAccessResourceUsageException.class);
connection.closePipeline();
}
protected byte[] getNewKeyForSlot(byte[] originalKey, Integer targetSlot) {
int counter = 0;
byte[] newKey = (new String(originalKey) + counter).getBytes();
Integer newKeySlot = connection.clusterGetSlotForKey(newKey);
while(!newKeySlot.equals(targetSlot)) {
counter++;
newKey = (new String(originalKey) + counter).getBytes();
newKeySlot = connection.clusterGetSlotForKey(newKey);
}
return newKey;
}
@Test
public void testRenameNX() {
connection.set(originalKey, value);
connection.expire(originalKey, 1000);
Integer originalSlot = connection.clusterGetSlotForKey(originalKey);
newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot));
Boolean result = connection.renameNX(originalKey, newKey);
assertThat(connection.get(newKey)).isEqualTo(value);
assertThat(connection.ttl(newKey)).isGreaterThan(0);
assertThat(result).isTrue();
connection.set(originalKey, value);
result = connection.renameNX(originalKey, newKey);
assertThat(result).isFalse();
}
@Test
public void testRenameNX_pipeline() {
connection.set(originalKey, value);
Integer originalSlot = connection.clusterGetSlotForKey(originalKey);
newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot));
connection.openPipeline();
assertThatThrownBy(() -> connection.renameNX(originalKey, newKey)).isInstanceOf(InvalidDataAccessResourceUsageException.class);
connection.closePipeline();
}
private Integer getTargetSlot(Integer originalSlot) {
return sameSlot ? originalSlot : MAX_SLOT - originalSlot - 1;
}
}

@ -1,32 +1,32 @@
package org.redisson.spring.data.connection;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.assertj.core.api.Assertions.*;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.redisson.ClusterRunner;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.Redisson;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SubscriptionMode;
import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.RedisClusterNode;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisNode.NodeType;
import org.springframework.data.redis.core.types.RedisClientInfo;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.assertj.core.api.Assertions.assertThat;
import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT;
public class RedissonClusterConnectionTest {
static RedissonClient redisson;
@ -127,8 +127,8 @@ public class RedissonClusterConnectionTest {
public void testClusterGetClusterInfo() {
ClusterInfo info = connection.clusterGetClusterInfo();
assertThat(info.getSlotsFail()).isEqualTo(0);
assertThat(info.getSlotsOk()).isEqualTo(MasterSlaveConnectionManager.MAX_SLOT);
assertThat(info.getSlotsAssigned()).isEqualTo(MasterSlaveConnectionManager.MAX_SLOT);
assertThat(info.getSlotsOk()).isEqualTo(MAX_SLOT);
assertThat(info.getSlotsAssigned()).isEqualTo(MAX_SLOT);
}
@Test
@ -218,5 +218,12 @@ public class RedissonClusterConnectionTest {
RedisClusterNode master = map.keySet().iterator().next();
return master;
}
@Test
public void testConnectionFactoryReturnsClusterConnection() {
RedisConnectionFactory connectionFactory = new RedissonConnectionFactory(redisson);
assertThat(connectionFactory.getConnection()).isInstanceOf(RedissonClusterConnection.class);
}
}

@ -0,0 +1,196 @@
package org.redisson.spring.data.connection;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.redisson.ClusterRunner;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.Redisson;
import org.redisson.RedissonKeys;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.config.Config;
import org.redisson.config.SubscriptionMode;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.redisson.reactive.CommandReactiveService;
import org.springframework.data.redis.RedisSystemException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT;
@RunWith(Parameterized.class)
public class RedissonReactiveClusterKeyCommandsTest {
@Parameterized.Parameters(name= "{index} - same slot = {0}; has ttl = {1}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] {
{false, false},
{true, false},
{false, true},
{true, true}
});
}
@Parameterized.Parameter(0)
public boolean sameSlot;
@Parameterized.Parameter(1)
public boolean hasTtl;
static RedissonClient redisson;
static RedissonReactiveRedisClusterConnection connection;
static ClusterProcesses process;
ByteBuffer originalKey = ByteBuffer.wrap("key".getBytes());
ByteBuffer newKey = ByteBuffer.wrap("unset".getBytes());
ByteBuffer value = ByteBuffer.wrap("value".getBytes());
@BeforeClass
public static void before() throws FailedToStartRedisException, IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setSubscriptionMode(SubscriptionMode.SLAVE)
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
redisson = Redisson.create(config);
connection = new RedissonReactiveRedisClusterConnection(new CommandReactiveService(((RedissonKeys) redisson.getKeys()).getConnectionManager()));
}
@AfterClass
public static void after() {
process.shutdown();
redisson.shutdown();
}
@After
public void cleanup() {
connection.keyCommands().del(originalKey)
.and(connection.keyCommands().del(newKey))
.block();
}
@Test
public void testRename() {
connection.stringCommands().set(originalKey, value).block();
if (hasTtl) {
connection.keyCommands().expire(originalKey, Duration.ofSeconds(1000)).block();
}
Integer originalSlot = getSlotForKey(originalKey);
newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot));
Boolean response = connection.keyCommands().rename(originalKey, newKey).block();
assertThat(response).isTrue();
final ByteBuffer newKeyValue = connection.stringCommands().get(newKey).block();
assertThat(newKeyValue).isEqualTo(value);
if (hasTtl) {
assertThat(connection.keyCommands().ttl(newKey).block()).isGreaterThan(0);
} else {
assertThat(connection.keyCommands().ttl(newKey).block()).isEqualTo(-1);
}
}
@Test
public void testRename_keyNotExist() {
Integer originalSlot = getSlotForKey(originalKey);
newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot));
if (sameSlot) {
// This is a quirk of the implementation - since same-slot renames use the non-cluster version,
// the result is a Redis error. This behavior matches other spring-data-redis implementations
assertThatThrownBy(() -> connection.keyCommands().rename(originalKey, newKey).block())
.isInstanceOf(RedisSystemException.class);
} else {
Boolean response = connection.keyCommands().rename(originalKey, newKey).block();
assertThat(response).isTrue();
final ByteBuffer newKeyValue = connection.stringCommands().get(newKey).block();
assertThat(newKeyValue).isEqualTo(null);
}
}
protected ByteBuffer getNewKeyForSlot(String originalKey, Integer targetSlot) {
int counter = 0;
ByteBuffer newKey = ByteBuffer.wrap((originalKey + counter).getBytes());
Integer newKeySlot = getSlotForKey(newKey);
while(!newKeySlot.equals(targetSlot)) {
counter++;
newKey = ByteBuffer.wrap((originalKey + counter).getBytes());
newKeySlot = getSlotForKey(newKey);
}
return newKey;
}
@Test
public void testRenameNX() {
connection.stringCommands().set(originalKey, value).block();
if (hasTtl) {
connection.keyCommands().expire(originalKey, Duration.ofSeconds(1000)).block();
}
Integer originalSlot = getSlotForKey(originalKey);
newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot));
Boolean result = connection.keyCommands().renameNX(originalKey, newKey).block();
assertThat(result).isTrue();
assertThat(connection.stringCommands().get(newKey).block()).isEqualTo(value);
if (hasTtl) {
assertThat(connection.keyCommands().ttl(newKey).block()).isGreaterThan(0);
} else {
assertThat(connection.keyCommands().ttl(newKey).block()).isEqualTo(-1);
}
connection.stringCommands().set(originalKey, value).block();
result = connection.keyCommands().renameNX(originalKey, newKey).block();
assertThat(result).isFalse();
}
private Integer getTargetSlot(Integer originalSlot) {
return sameSlot ? originalSlot : MAX_SLOT - originalSlot - 1;
}
private Integer getSlotForKey(ByteBuffer key) {
return (Integer) connection.read(null, StringCodec.INSTANCE, RedisCommands.KEYSLOT, key.array()).block();
}
}

@ -40,6 +40,7 @@ import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.StringMapDataDecoder;
import org.redisson.connection.MasterSlaveEntry;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.DefaultedRedisClusterConnection;
import org.springframework.data.redis.connection.RedisClusterNode;
@ -430,4 +431,66 @@ public class RedissonClusterConnection extends RedissonConnection implements Def
}.open();
}
@Override
public void rename(byte[] oldName, byte[] newName) {
if (isPipelined()) {
throw new InvalidDataAccessResourceUsageException("Clustered rename is not supported in a pipeline");
}
if (redisson.getConnectionManager().calcSlot(oldName) == redisson.getConnectionManager().calcSlot(newName)) {
super.rename(oldName, newName);
return;
}
final byte[] value = dump(oldName);
if (null != value) {
final Long sourceTtlInSeconds = ttl(oldName);
final long ttlInMilliseconds;
if (null != sourceTtlInSeconds && sourceTtlInSeconds > 0) {
ttlInMilliseconds = sourceTtlInSeconds * 1000;
} else {
ttlInMilliseconds = 0;
}
restore(newName, ttlInMilliseconds, value);
del(oldName);
}
}
@Override
public Boolean renameNX(byte[] oldName, byte[] newName) {
if (isPipelined()) {
throw new InvalidDataAccessResourceUsageException("Clustered rename is not supported in a pipeline");
}
if (redisson.getConnectionManager().calcSlot(oldName) == redisson.getConnectionManager().calcSlot(newName)) {
return super.renameNX(oldName, newName);
}
final byte[] value = dump(oldName);
if (null != value && !exists(newName)) {
final Long sourceTtlInSeconds = ttl(oldName);
final long ttlInMilliseconds;
if (null != sourceTtlInSeconds && sourceTtlInSeconds > 0) {
ttlInMilliseconds = sourceTtlInSeconds * 1000;
} else {
ttlInMilliseconds = 0;
}
restore(newName, ttlInMilliseconds, value);
del(oldName);
return true;
}
return false;
}
}

@ -100,6 +100,9 @@ public class RedissonConnectionFactory implements RedisConnectionFactory,
@Override
public RedisConnection getConnection() {
if (redisson.getConfig().isClusterConfig()) {
return new RedissonClusterConnection(redisson);
}
return new RedissonConnection(redisson);
}

@ -17,8 +17,10 @@ package org.redisson.spring.data.connection;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.redisson.api.RFuture;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.StringCodec;
@ -26,13 +28,18 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.reactive.CommandReactiveExecutor;
import org.springframework.data.redis.connection.ReactiveClusterKeyCommands;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveRedisConnection.BooleanResponse;
import org.springframework.data.redis.connection.RedisClusterNode;
import org.springframework.util.Assert;
import io.netty.util.CharsetUtil;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
/**
*
*
* @author Nikita Koksharov
*
*/
@ -59,4 +66,71 @@ public class RedissonReactiveClusterKeyCommands extends RedissonReactiveKeyComma
return m.map(v -> ByteBuffer.wrap(v));
}
@Override
public Flux<BooleanResponse<RenameCommand>> rename(Publisher<RenameCommand> commands) {
return execute(commands, command -> {
Assert.notNull(command.getKey(), "Key must not be null!");
Assert.notNull(command.getNewName(), "New name must not be null!");
byte[] keyBuf = toByteArray(command.getKey());
byte[] newKeyBuf = toByteArray(command.getNewName());
if (executorService.getConnectionManager().calcSlot(keyBuf) == executorService.getConnectionManager().calcSlot(newKeyBuf)) {
return super.rename(commands);
}
return read(keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.DUMP, keyBuf)
.filter(Objects::nonNull)
.zipWith(
Mono.defer(() -> pTtl(command.getKey())
.filter(Objects::nonNull)
.map(ttl -> Math.max(0, ttl))
.switchIfEmpty(Mono.just(0L))
)
)
.flatMap(valueAndTtl -> {
return write(newKeyBuf, StringCodec.INSTANCE, RedisCommands.RESTORE, newKeyBuf, valueAndTtl.getT2(), valueAndTtl.getT1());
})
.thenReturn(new BooleanResponse<>(command, true))
.doOnSuccess((ignored) -> del(command.getKey()));
});
}
@Override
public Flux<ReactiveRedisConnection.BooleanResponse<RenameCommand>> renameNX(Publisher<RenameCommand> commands) {
return execute(commands, command -> {
Assert.notNull(command.getKey(), "Key must not be null!");
Assert.notNull(command.getNewName(), "New name must not be null!");
byte[] keyBuf = toByteArray(command.getKey());
byte[] newKeyBuf = toByteArray(command.getNewName());
if (executorService.getConnectionManager().calcSlot(keyBuf) == executorService.getConnectionManager().calcSlot(newKeyBuf)) {
return super.renameNX(commands);
}
return exists(command.getNewName())
.zipWith(read(keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.DUMP, keyBuf))
.filter(newKeyExistsAndDump -> !newKeyExistsAndDump.getT1() && Objects.nonNull(newKeyExistsAndDump.getT2()))
.map(Tuple2::getT2)
.zipWhen(value ->
pTtl(command.getKey())
.filter(Objects::nonNull)
.map(ttl -> Math.max(0, ttl))
.switchIfEmpty(Mono.just(0L))
)
.flatMap(valueAndTtl -> write(newKeyBuf, StringCodec.INSTANCE, RedisCommands.RESTORE, newKeyBuf, valueAndTtl.getT2(), valueAndTtl.getT1())
.then(Mono.just(true)))
.switchIfEmpty(Mono.just(false))
.doOnSuccess(didRename -> {
if (didRename) {
del(command.getKey());
}
})
.map(didRename -> new BooleanResponse<>(command, didRename));
});
}
}

@ -0,0 +1,166 @@
package org.redisson.spring.data.connection;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.redisson.ClusterRunner;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SubscriptionMode;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import java.io.IOException;
import java.util.Arrays;
import static org.assertj.core.api.Assertions.*;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT;
@RunWith(Parameterized.class)
public class RedissonClusterConnectionRenameTest {
@Parameterized.Parameters(name= "{index} - same slot = {0}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] {
{false},
{true}
});
}
@Parameterized.Parameter(0)
public boolean sameSlot;
static RedissonClient redisson;
static RedissonClusterConnection connection;
static ClusterProcesses process;
byte[] originalKey = "key".getBytes();
byte[] newKey = "unset".getBytes();
byte[] value = "value".getBytes();
@BeforeClass
public static void before() throws FailedToStartRedisException, IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setSubscriptionMode(SubscriptionMode.SLAVE)
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
redisson = Redisson.create(config);
connection = new RedissonClusterConnection(redisson);
}
@AfterClass
public static void after() {
process.shutdown();
redisson.shutdown();
}
@After
public void cleanup() {
connection.del(originalKey);
connection.del(newKey);
}
@Test
public void testRename() {
connection.set(originalKey, value);
connection.expire(originalKey, 1000);
Integer originalSlot = connection.clusterGetSlotForKey(originalKey);
newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot));
connection.rename(originalKey, newKey);
assertThat(connection.get(newKey)).isEqualTo(value);
assertThat(connection.ttl(newKey)).isGreaterThan(0);
}
@Test
public void testRename_pipeline() {
connection.set(originalKey, value);
Integer originalSlot = connection.clusterGetSlotForKey(originalKey);
newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot));
connection.openPipeline();
assertThatThrownBy(() -> connection.rename(originalKey, newKey)).isInstanceOf(InvalidDataAccessResourceUsageException.class);
connection.closePipeline();
}
protected byte[] getNewKeyForSlot(byte[] originalKey, Integer targetSlot) {
int counter = 0;
byte[] newKey = (new String(originalKey) + counter).getBytes();
Integer newKeySlot = connection.clusterGetSlotForKey(newKey);
while(!newKeySlot.equals(targetSlot)) {
counter++;
newKey = (new String(originalKey) + counter).getBytes();
newKeySlot = connection.clusterGetSlotForKey(newKey);
}
return newKey;
}
@Test
public void testRenameNX() {
connection.set(originalKey, value);
connection.expire(originalKey, 1000);
Integer originalSlot = connection.clusterGetSlotForKey(originalKey);
newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot));
Boolean result = connection.renameNX(originalKey, newKey);
assertThat(connection.get(newKey)).isEqualTo(value);
assertThat(connection.ttl(newKey)).isGreaterThan(0);
assertThat(result).isTrue();
connection.set(originalKey, value);
result = connection.renameNX(originalKey, newKey);
assertThat(result).isFalse();
}
@Test
public void testRenameNX_pipeline() {
connection.set(originalKey, value);
Integer originalSlot = connection.clusterGetSlotForKey(originalKey);
newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot));
connection.openPipeline();
assertThatThrownBy(() -> connection.renameNX(originalKey, newKey)).isInstanceOf(InvalidDataAccessResourceUsageException.class);
connection.closePipeline();
}
private Integer getTargetSlot(Integer originalSlot) {
return sameSlot ? originalSlot : MAX_SLOT - originalSlot - 1;
}
}

@ -1,32 +1,34 @@
package org.redisson.spring.data.connection;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.assertj.core.api.Assertions.*;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.redisson.ClusterRunner;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.Redisson;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SubscriptionMode;
import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.RedisClusterNode;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisNode.NodeType;
import org.springframework.data.redis.core.types.RedisClientInfo;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.assertj.core.api.Assertions.*;
import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT;
public class RedissonClusterConnectionTest {
static RedissonClient redisson;
@ -218,5 +220,12 @@ public class RedissonClusterConnectionTest {
RedisClusterNode master = map.keySet().iterator().next();
return master;
}
@Test
public void testConnectionFactoryReturnsClusterConnection() {
RedisConnectionFactory connectionFactory = new RedissonConnectionFactory(redisson);
assertThat(connectionFactory.getConnection()).isInstanceOf(RedissonClusterConnection.class);
}
}

@ -0,0 +1,195 @@
package org.redisson.spring.data.connection;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.redisson.ClusterRunner;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.Redisson;
import org.redisson.RedissonKeys;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.config.Config;
import org.redisson.config.SubscriptionMode;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.redisson.reactive.CommandReactiveService;
import org.springframework.data.redis.RedisSystemException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT;
@RunWith(Parameterized.class)
public class RedissonReactiveClusterKeyCommandsTest {
@Parameterized.Parameters(name= "{index} - same slot = {0}; has ttl = {1}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] {
{false, false},
{true, false},
{false, true},
{true, true}
});
}
@Parameterized.Parameter(0)
public boolean sameSlot;
@Parameterized.Parameter(1)
public boolean hasTtl;
static RedissonClient redisson;
static RedissonReactiveRedisClusterConnection connection;
static ClusterProcesses process;
ByteBuffer originalKey = ByteBuffer.wrap("key".getBytes());
ByteBuffer newKey = ByteBuffer.wrap("unset".getBytes());
ByteBuffer value = ByteBuffer.wrap("value".getBytes());
@BeforeClass
public static void before() throws FailedToStartRedisException, IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setSubscriptionMode(SubscriptionMode.SLAVE)
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
redisson = Redisson.create(config);
connection = new RedissonReactiveRedisClusterConnection(new CommandReactiveService(((RedissonKeys) redisson.getKeys()).getConnectionManager()));
}
@AfterClass
public static void after() {
process.shutdown();
redisson.shutdown();
}
@After
public void cleanup() {
connection.keyCommands().del(originalKey)
.and(connection.keyCommands().del(newKey))
.block();
}
@Test
public void testRename() {
connection.stringCommands().set(originalKey, value).block();
if (hasTtl) {
connection.keyCommands().expire(originalKey, Duration.ofSeconds(1000)).block();
}
Integer originalSlot = getSlotForKey(originalKey);
newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot));
Boolean response = connection.keyCommands().rename(originalKey, newKey).block();
assertThat(response).isTrue();
final ByteBuffer newKeyValue = connection.stringCommands().get(newKey).block();
assertThat(newKeyValue).isEqualTo(value);
if (hasTtl) {
assertThat(connection.keyCommands().ttl(newKey).block()).isGreaterThan(0);
} else {
assertThat(connection.keyCommands().ttl(newKey).block()).isEqualTo(-1);
}
}
@Test
public void testRename_keyNotExist() {
Integer originalSlot = getSlotForKey(originalKey);
newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot));
if (sameSlot) {
// This is a quirk of the implementation - since same-slot renames use the non-cluster version,
// the result is a Redis error. This behavior matches other spring-data-redis implementations
assertThatThrownBy(() -> connection.keyCommands().rename(originalKey, newKey).block())
.isInstanceOf(RedisSystemException.class);
} else {
Boolean response = connection.keyCommands().rename(originalKey, newKey).block();
assertThat(response).isTrue();
final ByteBuffer newKeyValue = connection.stringCommands().get(newKey).block();
assertThat(newKeyValue).isEqualTo(null);
}
}
protected ByteBuffer getNewKeyForSlot(String originalKey, Integer targetSlot) {
int counter = 0;
ByteBuffer newKey = ByteBuffer.wrap((originalKey + counter).getBytes());
Integer newKeySlot = getSlotForKey(newKey);
while(!newKeySlot.equals(targetSlot)) {
counter++;
newKey = ByteBuffer.wrap((originalKey + counter).getBytes());
newKeySlot = getSlotForKey(newKey);
}
return newKey;
}
@Test
public void testRenameNX() {
connection.stringCommands().set(originalKey, value).block();
if (hasTtl) {
connection.keyCommands().expire(originalKey, Duration.ofSeconds(1000)).block();
}
Integer originalSlot = getSlotForKey(originalKey);
newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot));
Boolean result = connection.keyCommands().renameNX(originalKey, newKey).block();
assertThat(result).isTrue();
assertThat(connection.stringCommands().get(newKey).block()).isEqualTo(value);
if (hasTtl) {
assertThat(connection.keyCommands().ttl(newKey).block()).isGreaterThan(0);
} else {
assertThat(connection.keyCommands().ttl(newKey).block()).isEqualTo(-1);
}
connection.stringCommands().set(originalKey, value).block();
result = connection.keyCommands().renameNX(originalKey, newKey).block();
assertThat(result).isFalse();
}
private Integer getTargetSlot(Integer originalSlot) {
return sameSlot ? originalSlot : MAX_SLOT - originalSlot - 1;
}
private Integer getSlotForKey(ByteBuffer key) {
return (Integer) connection.read(null, StringCodec.INSTANCE, RedisCommands.KEYSLOT, key.array()).block();
}
}

@ -40,6 +40,7 @@ import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.StringMapDataDecoder;
import org.redisson.connection.MasterSlaveEntry;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.DefaultedRedisClusterConnection;
import org.springframework.data.redis.connection.RedisClusterNode;
@ -430,4 +431,66 @@ public class RedissonClusterConnection extends RedissonConnection implements Def
}.open();
}
@Override
public void rename(byte[] oldName, byte[] newName) {
if (isPipelined()) {
throw new InvalidDataAccessResourceUsageException("Clustered rename is not supported in a pipeline");
}
if (redisson.getConnectionManager().calcSlot(oldName) == redisson.getConnectionManager().calcSlot(newName)) {
super.rename(oldName, newName);
return;
}
final byte[] value = dump(oldName);
if (null != value) {
final Long sourceTtlInSeconds = ttl(oldName);
final long ttlInMilliseconds;
if (null != sourceTtlInSeconds && sourceTtlInSeconds > 0) {
ttlInMilliseconds = sourceTtlInSeconds * 1000;
} else {
ttlInMilliseconds = 0;
}
restore(newName, ttlInMilliseconds, value);
del(oldName);
}
}
@Override
public Boolean renameNX(byte[] oldName, byte[] newName) {
if (isPipelined()) {
throw new InvalidDataAccessResourceUsageException("Clustered rename is not supported in a pipeline");
}
if (redisson.getConnectionManager().calcSlot(oldName) == redisson.getConnectionManager().calcSlot(newName)) {
return super.renameNX(oldName, newName);
}
final byte[] value = dump(oldName);
if (null != value && !exists(newName)) {
final Long sourceTtlInSeconds = ttl(oldName);
final long ttlInMilliseconds;
if (null != sourceTtlInSeconds && sourceTtlInSeconds > 0) {
ttlInMilliseconds = sourceTtlInSeconds * 1000;
} else {
ttlInMilliseconds = 0;
}
restore(newName, ttlInMilliseconds, value);
del(oldName);
return true;
}
return false;
}
}

@ -106,6 +106,9 @@ public class RedissonConnectionFactory implements RedisConnectionFactory,
@Override
public RedisConnection getConnection() {
if (redisson.getConfig().isClusterConfig()) {
return new RedissonClusterConnection(redisson);
}
return new RedissonConnection(redisson);
}

@ -17,8 +17,10 @@ package org.redisson.spring.data.connection;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.redisson.api.RFuture;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.StringCodec;
@ -26,13 +28,18 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.reactive.CommandReactiveExecutor;
import org.springframework.data.redis.connection.ReactiveClusterKeyCommands;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveRedisConnection.BooleanResponse;
import org.springframework.data.redis.connection.RedisClusterNode;
import org.springframework.util.Assert;
import io.netty.util.CharsetUtil;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
/**
*
*
* @author Nikita Koksharov
*
*/
@ -59,4 +66,71 @@ public class RedissonReactiveClusterKeyCommands extends RedissonReactiveKeyComma
return m.map(v -> ByteBuffer.wrap(v));
}
@Override
public Flux<BooleanResponse<RenameCommand>> rename(Publisher<RenameCommand> commands) {
return execute(commands, command -> {
Assert.notNull(command.getKey(), "Key must not be null!");
Assert.notNull(command.getNewName(), "New name must not be null!");
byte[] keyBuf = toByteArray(command.getKey());
byte[] newKeyBuf = toByteArray(command.getNewName());
if (executorService.getConnectionManager().calcSlot(keyBuf) == executorService.getConnectionManager().calcSlot(newKeyBuf)) {
return super.rename(commands);
}
return read(keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.DUMP, keyBuf)
.filter(Objects::nonNull)
.zipWith(
Mono.defer(() -> pTtl(command.getKey())
.filter(Objects::nonNull)
.map(ttl -> Math.max(0, ttl))
.switchIfEmpty(Mono.just(0L))
)
)
.flatMap(valueAndTtl -> {
return write(newKeyBuf, StringCodec.INSTANCE, RedisCommands.RESTORE, newKeyBuf, valueAndTtl.getT2(), valueAndTtl.getT1());
})
.thenReturn(new BooleanResponse<>(command, true))
.doOnSuccess((ignored) -> del(command.getKey()));
});
}
@Override
public Flux<ReactiveRedisConnection.BooleanResponse<RenameCommand>> renameNX(Publisher<RenameCommand> commands) {
return execute(commands, command -> {
Assert.notNull(command.getKey(), "Key must not be null!");
Assert.notNull(command.getNewName(), "New name must not be null!");
byte[] keyBuf = toByteArray(command.getKey());
byte[] newKeyBuf = toByteArray(command.getNewName());
if (executorService.getConnectionManager().calcSlot(keyBuf) == executorService.getConnectionManager().calcSlot(newKeyBuf)) {
return super.renameNX(commands);
}
return exists(command.getNewName())
.zipWith(read(keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.DUMP, keyBuf))
.filter(newKeyExistsAndDump -> !newKeyExistsAndDump.getT1() && Objects.nonNull(newKeyExistsAndDump.getT2()))
.map(Tuple2::getT2)
.zipWhen(value ->
pTtl(command.getKey())
.filter(Objects::nonNull)
.map(ttl -> Math.max(0, ttl))
.switchIfEmpty(Mono.just(0L))
)
.flatMap(valueAndTtl -> write(newKeyBuf, StringCodec.INSTANCE, RedisCommands.RESTORE, newKeyBuf, valueAndTtl.getT2(), valueAndTtl.getT1())
.then(Mono.just(true)))
.switchIfEmpty(Mono.just(false))
.doOnSuccess(didRename -> {
if (didRename) {
del(command.getKey());
}
})
.map(didRename -> new BooleanResponse<>(command, didRename));
});
}
}

@ -0,0 +1,166 @@
package org.redisson.spring.data.connection;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.redisson.ClusterRunner;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SubscriptionMode;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import java.io.IOException;
import java.util.Arrays;
import static org.assertj.core.api.Assertions.*;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT;
@RunWith(Parameterized.class)
public class RedissonClusterConnectionRenameTest {
@Parameterized.Parameters(name= "{index} - same slot = {0}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] {
{false},
{true}
});
}
@Parameterized.Parameter(0)
public boolean sameSlot;
static RedissonClient redisson;
static RedissonClusterConnection connection;
static ClusterProcesses process;
byte[] originalKey = "key".getBytes();
byte[] newKey = "unset".getBytes();
byte[] value = "value".getBytes();
@BeforeClass
public static void before() throws FailedToStartRedisException, IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setSubscriptionMode(SubscriptionMode.SLAVE)
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
redisson = Redisson.create(config);
connection = new RedissonClusterConnection(redisson);
}
@AfterClass
public static void after() {
process.shutdown();
redisson.shutdown();
}
@After
public void cleanup() {
connection.del(originalKey);
connection.del(newKey);
}
@Test
public void testRename() {
connection.set(originalKey, value);
connection.expire(originalKey, 1000);
Integer originalSlot = connection.clusterGetSlotForKey(originalKey);
newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot));
connection.rename(originalKey, newKey);
assertThat(connection.get(newKey)).isEqualTo(value);
assertThat(connection.ttl(newKey)).isGreaterThan(0);
}
@Test
public void testRename_pipeline() {
connection.set(originalKey, value);
Integer originalSlot = connection.clusterGetSlotForKey(originalKey);
newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot));
connection.openPipeline();
assertThatThrownBy(() -> connection.rename(originalKey, newKey)).isInstanceOf(InvalidDataAccessResourceUsageException.class);
connection.closePipeline();
}
protected byte[] getNewKeyForSlot(byte[] originalKey, Integer targetSlot) {
int counter = 0;
byte[] newKey = (new String(originalKey) + counter).getBytes();
Integer newKeySlot = connection.clusterGetSlotForKey(newKey);
while(!newKeySlot.equals(targetSlot)) {
counter++;
newKey = (new String(originalKey) + counter).getBytes();
newKeySlot = connection.clusterGetSlotForKey(newKey);
}
return newKey;
}
@Test
public void testRenameNX() {
connection.set(originalKey, value);
connection.expire(originalKey, 1000);
Integer originalSlot = connection.clusterGetSlotForKey(originalKey);
newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot));
Boolean result = connection.renameNX(originalKey, newKey);
assertThat(connection.get(newKey)).isEqualTo(value);
assertThat(connection.ttl(newKey)).isGreaterThan(0);
assertThat(result).isTrue();
connection.set(originalKey, value);
result = connection.renameNX(originalKey, newKey);
assertThat(result).isFalse();
}
@Test
public void testRenameNX_pipeline() {
connection.set(originalKey, value);
Integer originalSlot = connection.clusterGetSlotForKey(originalKey);
newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot));
connection.openPipeline();
assertThatThrownBy(() -> connection.renameNX(originalKey, newKey)).isInstanceOf(InvalidDataAccessResourceUsageException.class);
connection.closePipeline();
}
private Integer getTargetSlot(Integer originalSlot) {
return sameSlot ? originalSlot : MAX_SLOT - originalSlot - 1;
}
}

@ -1,32 +1,34 @@
package org.redisson.spring.data.connection;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.assertj.core.api.Assertions.*;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.redisson.ClusterRunner;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.Redisson;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SubscriptionMode;
import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.RedisClusterNode;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisNode.NodeType;
import org.springframework.data.redis.core.types.RedisClientInfo;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.assertj.core.api.Assertions.*;
import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT;
public class RedissonClusterConnectionTest {
static RedissonClient redisson;
@ -218,5 +220,12 @@ public class RedissonClusterConnectionTest {
RedisClusterNode master = map.keySet().iterator().next();
return master;
}
@Test
public void testConnectionFactoryReturnsClusterConnection() {
RedisConnectionFactory connectionFactory = new RedissonConnectionFactory(redisson);
assertThat(connectionFactory.getConnection()).isInstanceOf(RedissonClusterConnection.class);
}
}

@ -0,0 +1,195 @@
package org.redisson.spring.data.connection;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.redisson.ClusterRunner;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.Redisson;
import org.redisson.RedissonKeys;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.config.Config;
import org.redisson.config.SubscriptionMode;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.redisson.reactive.CommandReactiveService;
import org.springframework.data.redis.RedisSystemException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT;
@RunWith(Parameterized.class)
public class RedissonReactiveClusterKeyCommandsTest {
@Parameterized.Parameters(name= "{index} - same slot = {0}; has ttl = {1}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] {
{false, false},
{true, false},
{false, true},
{true, true}
});
}
@Parameterized.Parameter(0)
public boolean sameSlot;
@Parameterized.Parameter(1)
public boolean hasTtl;
static RedissonClient redisson;
static RedissonReactiveRedisClusterConnection connection;
static ClusterProcesses process;
ByteBuffer originalKey = ByteBuffer.wrap("key".getBytes());
ByteBuffer newKey = ByteBuffer.wrap("unset".getBytes());
ByteBuffer value = ByteBuffer.wrap("value".getBytes());
@BeforeClass
public static void before() throws FailedToStartRedisException, IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setSubscriptionMode(SubscriptionMode.SLAVE)
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
redisson = Redisson.create(config);
connection = new RedissonReactiveRedisClusterConnection(new CommandReactiveService(((RedissonKeys) redisson.getKeys()).getConnectionManager()));
}
@AfterClass
public static void after() {
process.shutdown();
redisson.shutdown();
}
@After
public void cleanup() {
connection.keyCommands().del(originalKey)
.and(connection.keyCommands().del(newKey))
.block();
}
@Test
public void testRename() {
connection.stringCommands().set(originalKey, value).block();
if (hasTtl) {
connection.keyCommands().expire(originalKey, Duration.ofSeconds(1000)).block();
}
Integer originalSlot = getSlotForKey(originalKey);
newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot));
Boolean response = connection.keyCommands().rename(originalKey, newKey).block();
assertThat(response).isTrue();
final ByteBuffer newKeyValue = connection.stringCommands().get(newKey).block();
assertThat(newKeyValue).isEqualTo(value);
if (hasTtl) {
assertThat(connection.keyCommands().ttl(newKey).block()).isGreaterThan(0);
} else {
assertThat(connection.keyCommands().ttl(newKey).block()).isEqualTo(-1);
}
}
@Test
public void testRename_keyNotExist() {
Integer originalSlot = getSlotForKey(originalKey);
newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot));
if (sameSlot) {
// This is a quirk of the implementation - since same-slot renames use the non-cluster version,
// the result is a Redis error. This behavior matches other spring-data-redis implementations
assertThatThrownBy(() -> connection.keyCommands().rename(originalKey, newKey).block())
.isInstanceOf(RedisSystemException.class);
} else {
Boolean response = connection.keyCommands().rename(originalKey, newKey).block();
assertThat(response).isTrue();
final ByteBuffer newKeyValue = connection.stringCommands().get(newKey).block();
assertThat(newKeyValue).isEqualTo(null);
}
}
protected ByteBuffer getNewKeyForSlot(String originalKey, Integer targetSlot) {
int counter = 0;
ByteBuffer newKey = ByteBuffer.wrap((originalKey + counter).getBytes());
Integer newKeySlot = getSlotForKey(newKey);
while(!newKeySlot.equals(targetSlot)) {
counter++;
newKey = ByteBuffer.wrap((originalKey + counter).getBytes());
newKeySlot = getSlotForKey(newKey);
}
return newKey;
}
@Test
public void testRenameNX() {
connection.stringCommands().set(originalKey, value).block();
if (hasTtl) {
connection.keyCommands().expire(originalKey, Duration.ofSeconds(1000)).block();
}
Integer originalSlot = getSlotForKey(originalKey);
newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot));
Boolean result = connection.keyCommands().renameNX(originalKey, newKey).block();
assertThat(result).isTrue();
assertThat(connection.stringCommands().get(newKey).block()).isEqualTo(value);
if (hasTtl) {
assertThat(connection.keyCommands().ttl(newKey).block()).isGreaterThan(0);
} else {
assertThat(connection.keyCommands().ttl(newKey).block()).isEqualTo(-1);
}
connection.stringCommands().set(originalKey, value).block();
result = connection.keyCommands().renameNX(originalKey, newKey).block();
assertThat(result).isFalse();
}
private Integer getTargetSlot(Integer originalSlot) {
return sameSlot ? originalSlot : MAX_SLOT - originalSlot - 1;
}
private Integer getSlotForKey(ByteBuffer key) {
return (Integer) connection.read(null, StringCodec.INSTANCE, RedisCommands.KEYSLOT, key.array()).block();
}
}

@ -40,6 +40,7 @@ import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.StringMapDataDecoder;
import org.redisson.connection.MasterSlaveEntry;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.DefaultedRedisClusterConnection;
import org.springframework.data.redis.connection.RedisClusterNode;
@ -430,4 +431,66 @@ public class RedissonClusterConnection extends RedissonConnection implements Def
}.open();
}
@Override
public void rename(byte[] oldName, byte[] newName) {
if (isPipelined()) {
throw new InvalidDataAccessResourceUsageException("Clustered rename is not supported in a pipeline");
}
if (redisson.getConnectionManager().calcSlot(oldName) == redisson.getConnectionManager().calcSlot(newName)) {
super.rename(oldName, newName);
return;
}
final byte[] value = dump(oldName);
if (null != value) {
final Long sourceTtlInSeconds = ttl(oldName);
final long ttlInMilliseconds;
if (null != sourceTtlInSeconds && sourceTtlInSeconds > 0) {
ttlInMilliseconds = sourceTtlInSeconds * 1000;
} else {
ttlInMilliseconds = 0;
}
restore(newName, ttlInMilliseconds, value);
del(oldName);
}
}
@Override
public Boolean renameNX(byte[] oldName, byte[] newName) {
if (isPipelined()) {
throw new InvalidDataAccessResourceUsageException("Clustered rename is not supported in a pipeline");
}
if (redisson.getConnectionManager().calcSlot(oldName) == redisson.getConnectionManager().calcSlot(newName)) {
return super.renameNX(oldName, newName);
}
final byte[] value = dump(oldName);
if (null != value && !exists(newName)) {
final Long sourceTtlInSeconds = ttl(oldName);
final long ttlInMilliseconds;
if (null != sourceTtlInSeconds && sourceTtlInSeconds > 0) {
ttlInMilliseconds = sourceTtlInSeconds * 1000;
} else {
ttlInMilliseconds = 0;
}
restore(newName, ttlInMilliseconds, value);
del(oldName);
return true;
}
return false;
}
}

@ -106,6 +106,9 @@ public class RedissonConnectionFactory implements RedisConnectionFactory,
@Override
public RedisConnection getConnection() {
if (redisson.getConfig().isClusterConfig()) {
return new RedissonClusterConnection(redisson);
}
return new RedissonConnection(redisson);
}

@ -17,8 +17,10 @@ package org.redisson.spring.data.connection;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.redisson.api.RFuture;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.StringCodec;
@ -26,13 +28,18 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.reactive.CommandReactiveExecutor;
import org.springframework.data.redis.connection.ReactiveClusterKeyCommands;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveRedisConnection.BooleanResponse;
import org.springframework.data.redis.connection.RedisClusterNode;
import org.springframework.util.Assert;
import io.netty.util.CharsetUtil;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
/**
*
*
* @author Nikita Koksharov
*
*/
@ -59,4 +66,71 @@ public class RedissonReactiveClusterKeyCommands extends RedissonReactiveKeyComma
return m.map(v -> ByteBuffer.wrap(v));
}
@Override
public Flux<BooleanResponse<RenameCommand>> rename(Publisher<RenameCommand> commands) {
return execute(commands, command -> {
Assert.notNull(command.getKey(), "Key must not be null!");
Assert.notNull(command.getNewName(), "New name must not be null!");
byte[] keyBuf = toByteArray(command.getKey());
byte[] newKeyBuf = toByteArray(command.getNewName());
if (executorService.getConnectionManager().calcSlot(keyBuf) == executorService.getConnectionManager().calcSlot(newKeyBuf)) {
return super.rename(commands);
}
return read(keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.DUMP, keyBuf)
.filter(Objects::nonNull)
.zipWith(
Mono.defer(() -> pTtl(command.getKey())
.filter(Objects::nonNull)
.map(ttl -> Math.max(0, ttl))
.switchIfEmpty(Mono.just(0L))
)
)
.flatMap(valueAndTtl -> {
return write(newKeyBuf, StringCodec.INSTANCE, RedisCommands.RESTORE, newKeyBuf, valueAndTtl.getT2(), valueAndTtl.getT1());
})
.thenReturn(new BooleanResponse<>(command, true))
.doOnSuccess((ignored) -> del(command.getKey()));
});
}
@Override
public Flux<ReactiveRedisConnection.BooleanResponse<RenameCommand>> renameNX(Publisher<RenameCommand> commands) {
return execute(commands, command -> {
Assert.notNull(command.getKey(), "Key must not be null!");
Assert.notNull(command.getNewName(), "New name must not be null!");
byte[] keyBuf = toByteArray(command.getKey());
byte[] newKeyBuf = toByteArray(command.getNewName());
if (executorService.getConnectionManager().calcSlot(keyBuf) == executorService.getConnectionManager().calcSlot(newKeyBuf)) {
return super.renameNX(commands);
}
return exists(command.getNewName())
.zipWith(read(keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.DUMP, keyBuf))
.filter(newKeyExistsAndDump -> !newKeyExistsAndDump.getT1() && Objects.nonNull(newKeyExistsAndDump.getT2()))
.map(Tuple2::getT2)
.zipWhen(value ->
pTtl(command.getKey())
.filter(Objects::nonNull)
.map(ttl -> Math.max(0, ttl))
.switchIfEmpty(Mono.just(0L))
)
.flatMap(valueAndTtl -> write(newKeyBuf, StringCodec.INSTANCE, RedisCommands.RESTORE, newKeyBuf, valueAndTtl.getT2(), valueAndTtl.getT1())
.then(Mono.just(true)))
.switchIfEmpty(Mono.just(false))
.doOnSuccess(didRename -> {
if (didRename) {
del(command.getKey());
}
})
.map(didRename -> new BooleanResponse<>(command, didRename));
});
}
}

@ -0,0 +1,166 @@
package org.redisson.spring.data.connection;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.redisson.ClusterRunner;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SubscriptionMode;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import java.io.IOException;
import java.util.Arrays;
import static org.assertj.core.api.Assertions.*;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT;
@RunWith(Parameterized.class)
public class RedissonClusterConnectionRenameTest {
@Parameterized.Parameters(name= "{index} - same slot = {0}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] {
{false},
{true}
});
}
@Parameterized.Parameter(0)
public boolean sameSlot;
static RedissonClient redisson;
static RedissonClusterConnection connection;
static ClusterProcesses process;
byte[] originalKey = "key".getBytes();
byte[] newKey = "unset".getBytes();
byte[] value = "value".getBytes();
@BeforeClass
public static void before() throws FailedToStartRedisException, IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setSubscriptionMode(SubscriptionMode.SLAVE)
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
redisson = Redisson.create(config);
connection = new RedissonClusterConnection(redisson);
}
@AfterClass
public static void after() {
process.shutdown();
redisson.shutdown();
}
@After
public void cleanup() {
connection.del(originalKey);
connection.del(newKey);
}
@Test
public void testRename() {
connection.set(originalKey, value);
connection.expire(originalKey, 1000);
Integer originalSlot = connection.clusterGetSlotForKey(originalKey);
newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot));
connection.rename(originalKey, newKey);
assertThat(connection.get(newKey)).isEqualTo(value);
assertThat(connection.ttl(newKey)).isGreaterThan(0);
}
@Test
public void testRename_pipeline() {
connection.set(originalKey, value);
Integer originalSlot = connection.clusterGetSlotForKey(originalKey);
newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot));
connection.openPipeline();
assertThatThrownBy(() -> connection.rename(originalKey, newKey)).isInstanceOf(InvalidDataAccessResourceUsageException.class);
connection.closePipeline();
}
protected byte[] getNewKeyForSlot(byte[] originalKey, Integer targetSlot) {
int counter = 0;
byte[] newKey = (new String(originalKey) + counter).getBytes();
Integer newKeySlot = connection.clusterGetSlotForKey(newKey);
while(!newKeySlot.equals(targetSlot)) {
counter++;
newKey = (new String(originalKey) + counter).getBytes();
newKeySlot = connection.clusterGetSlotForKey(newKey);
}
return newKey;
}
@Test
public void testRenameNX() {
connection.set(originalKey, value);
connection.expire(originalKey, 1000);
Integer originalSlot = connection.clusterGetSlotForKey(originalKey);
newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot));
Boolean result = connection.renameNX(originalKey, newKey);
assertThat(connection.get(newKey)).isEqualTo(value);
assertThat(connection.ttl(newKey)).isGreaterThan(0);
assertThat(result).isTrue();
connection.set(originalKey, value);
result = connection.renameNX(originalKey, newKey);
assertThat(result).isFalse();
}
@Test
public void testRenameNX_pipeline() {
connection.set(originalKey, value);
Integer originalSlot = connection.clusterGetSlotForKey(originalKey);
newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot));
connection.openPipeline();
assertThatThrownBy(() -> connection.renameNX(originalKey, newKey)).isInstanceOf(InvalidDataAccessResourceUsageException.class);
connection.closePipeline();
}
private Integer getTargetSlot(Integer originalSlot) {
return sameSlot ? originalSlot : MAX_SLOT - originalSlot - 1;
}
}

@ -1,32 +1,34 @@
package org.redisson.spring.data.connection;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.assertj.core.api.Assertions.*;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.redisson.ClusterRunner;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.Redisson;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SubscriptionMode;
import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.RedisClusterNode;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisNode.NodeType;
import org.springframework.data.redis.core.types.RedisClientInfo;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.assertj.core.api.Assertions.*;
import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT;
public class RedissonClusterConnectionTest {
static RedissonClient redisson;
@ -218,5 +220,12 @@ public class RedissonClusterConnectionTest {
RedisClusterNode master = map.keySet().iterator().next();
return master;
}
@Test
public void testConnectionFactoryReturnsClusterConnection() {
RedisConnectionFactory connectionFactory = new RedissonConnectionFactory(redisson);
assertThat(connectionFactory.getConnection()).isInstanceOf(RedissonClusterConnection.class);
}
}

@ -0,0 +1,196 @@
package org.redisson.spring.data.connection;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.redisson.ClusterRunner;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.Redisson;
import org.redisson.RedissonKeys;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.config.Config;
import org.redisson.config.SubscriptionMode;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.redisson.reactive.CommandReactiveService;
import org.springframework.data.redis.RedisSystemException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT;
@RunWith(Parameterized.class)
public class RedissonReactiveClusterKeyCommandsTest {
@Parameterized.Parameters(name= "{index} - same slot = {0}; has ttl = {1}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] {
{false, false},
{true, false},
{false, true},
{true, true}
});
}
@Parameterized.Parameter(0)
public boolean sameSlot;
@Parameterized.Parameter(1)
public boolean hasTtl;
static RedissonClient redisson;
static RedissonReactiveRedisClusterConnection connection;
static ClusterProcesses process;
ByteBuffer originalKey = ByteBuffer.wrap("key".getBytes());
ByteBuffer newKey = ByteBuffer.wrap("unset".getBytes());
ByteBuffer value = ByteBuffer.wrap("value".getBytes());
@BeforeClass
public static void before() throws FailedToStartRedisException, IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setSubscriptionMode(SubscriptionMode.SLAVE)
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
redisson = Redisson.create(config);
connection = new RedissonReactiveRedisClusterConnection(new CommandReactiveService(((RedissonKeys) redisson.getKeys()).getConnectionManager()));
}
@AfterClass
public static void after() {
process.shutdown();
redisson.shutdown();
}
@After
public void cleanup() {
connection.keyCommands().del(originalKey)
.and(connection.keyCommands().del(newKey))
.block();
}
@Test
public void testRename() {
connection.stringCommands().set(originalKey, value).block();
if (hasTtl) {
connection.keyCommands().expire(originalKey, Duration.ofSeconds(1000)).block();
}
Integer originalSlot = getSlotForKey(originalKey);
newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot));
Boolean response = connection.keyCommands().rename(originalKey, newKey).block();
assertThat(response).isTrue();
final ByteBuffer newKeyValue = connection.stringCommands().get(newKey).block();
assertThat(newKeyValue).isEqualTo(value);
if (hasTtl) {
assertThat(connection.keyCommands().ttl(newKey).block()).isGreaterThan(0);
} else {
assertThat(connection.keyCommands().ttl(newKey).block()).isEqualTo(-1);
}
}
@Test
public void testRename_keyNotExist() {
Integer originalSlot = getSlotForKey(originalKey);
newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot));
if (sameSlot) {
// This is a quirk of the implementation - since same-slot renames use the non-cluster version,
// the result is a Redis error. This behavior matches other spring-data-redis implementations
assertThatThrownBy(() -> connection.keyCommands().rename(originalKey, newKey).block())
.isInstanceOf(RedisSystemException.class);
} else {
Boolean response = connection.keyCommands().rename(originalKey, newKey).block();
assertThat(response).isTrue();
final ByteBuffer newKeyValue = connection.stringCommands().get(newKey).block();
assertThat(newKeyValue).isEqualTo(null);
}
}
protected ByteBuffer getNewKeyForSlot(String originalKey, Integer targetSlot) {
int counter = 0;
ByteBuffer newKey = ByteBuffer.wrap((originalKey + counter).getBytes());
Integer newKeySlot = getSlotForKey(newKey);
while(!newKeySlot.equals(targetSlot)) {
counter++;
newKey = ByteBuffer.wrap((originalKey + counter).getBytes());
newKeySlot = getSlotForKey(newKey);
}
return newKey;
}
@Test
public void testRenameNX() {
connection.stringCommands().set(originalKey, value).block();
if (hasTtl) {
connection.keyCommands().expire(originalKey, Duration.ofSeconds(1000)).block();
}
Integer originalSlot = getSlotForKey(originalKey);
newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot));
Boolean result = connection.keyCommands().renameNX(originalKey, newKey).block();
assertThat(result).isTrue();
assertThat(connection.stringCommands().get(newKey).block()).isEqualTo(value);
if (hasTtl) {
assertThat(connection.keyCommands().ttl(newKey).block()).isGreaterThan(0);
} else {
assertThat(connection.keyCommands().ttl(newKey).block()).isEqualTo(-1);
}
connection.stringCommands().set(originalKey, value).block();
result = connection.keyCommands().renameNX(originalKey, newKey).block();
assertThat(result).isFalse();
}
private Integer getTargetSlot(Integer originalSlot) {
return sameSlot ? originalSlot : MAX_SLOT - originalSlot - 1;
}
private Integer getSlotForKey(ByteBuffer key) {
return (Integer) connection.read(null, StringCodec.INSTANCE, RedisCommands.KEYSLOT, key.array()).block();
}
}
Loading…
Cancel
Save