diff --git a/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java b/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java index 1033939da..935b01c90 100644 --- a/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java +++ b/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java @@ -39,6 +39,7 @@ import org.redisson.client.protocol.RedisStrictCommand; import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.client.protocol.decoder.StringMapDataDecoder; import org.redisson.connection.MasterSlaveEntry; +import org.springframework.dao.InvalidDataAccessResourceUsageException; import org.springframework.data.redis.connection.ClusterInfo; import org.springframework.data.redis.connection.RedisClusterConnection; import org.springframework.data.redis.connection.RedisClusterNode; @@ -377,4 +378,66 @@ public class RedissonClusterConnection extends RedissonConnection implements Red return CONVERTER.convert(list.toArray(new String[list.size()])); } + @Override + public void rename(byte[] oldName, byte[] newName) { + + if (isPipelined()) { + throw new InvalidDataAccessResourceUsageException("Clustered rename is not supported in a pipeline"); + } + + if (redisson.getConnectionManager().calcSlot(oldName) == redisson.getConnectionManager().calcSlot(newName)) { + super.rename(oldName, newName); + return; + } + + final byte[] value = dump(oldName); + + if (null != value) { + + final Long sourceTtlInSeconds = ttl(oldName); + + final long ttlInMilliseconds; + if (null != sourceTtlInSeconds && sourceTtlInSeconds > 0) { + ttlInMilliseconds = sourceTtlInSeconds * 1000; + } else { + ttlInMilliseconds = 0; + } + + restore(newName, ttlInMilliseconds, value); + del(oldName); + } + } + + @Override + public Boolean renameNX(byte[] oldName, byte[] newName) { + if (isPipelined()) { + throw new InvalidDataAccessResourceUsageException("Clustered rename is not supported in a pipeline"); + } + + if (redisson.getConnectionManager().calcSlot(oldName) == redisson.getConnectionManager().calcSlot(newName)) { + return super.renameNX(oldName, newName); + } + + final byte[] value = dump(oldName); + + if (null != value && !exists(newName)) { + + final Long sourceTtlInSeconds = ttl(oldName); + + final long ttlInMilliseconds; + if (null != sourceTtlInSeconds && sourceTtlInSeconds > 0) { + ttlInMilliseconds = sourceTtlInSeconds * 1000; + } else { + ttlInMilliseconds = 0; + } + + restore(newName, ttlInMilliseconds, value); + del(oldName); + + return true; + } + + return false; + } + } diff --git a/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/RedissonConnectionFactory.java b/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/RedissonConnectionFactory.java index c63fb2be1..e440da622 100644 --- a/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/RedissonConnectionFactory.java +++ b/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/RedissonConnectionFactory.java @@ -95,6 +95,9 @@ public class RedissonConnectionFactory implements RedisConnectionFactory, Initia @Override public RedisConnection getConnection() { + if (redisson.getConfig().isClusterConfig()) { + return new RedissonClusterConnection(redisson); + } return new RedissonConnection(redisson); } diff --git a/redisson-spring-data/redisson-spring-data-17/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionRenameTest.java b/redisson-spring-data/redisson-spring-data-17/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionRenameTest.java new file mode 100644 index 000000000..ed8a07bb7 --- /dev/null +++ b/redisson-spring-data/redisson-spring-data-17/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionRenameTest.java @@ -0,0 +1,166 @@ +package org.redisson.spring.data.connection; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.redisson.ClusterRunner; +import org.redisson.ClusterRunner.ClusterProcesses; +import org.redisson.RedisRunner; +import org.redisson.RedisRunner.FailedToStartRedisException; +import org.redisson.Redisson; +import org.redisson.api.RedissonClient; +import org.redisson.config.Config; +import org.redisson.config.SubscriptionMode; +import org.redisson.connection.balancer.RandomLoadBalancer; +import org.springframework.dao.InvalidDataAccessResourceUsageException; + +import java.io.IOException; +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.*; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT; + +@RunWith(Parameterized.class) +public class RedissonClusterConnectionRenameTest { + + @Parameterized.Parameters(name= "{index} - same slot = {0}") + public static Iterable data() { + return Arrays.asList(new Object[][] { + {false}, + {true} + }); + } + + @Parameterized.Parameter(0) + public boolean sameSlot; + + static RedissonClient redisson; + static RedissonClusterConnection connection; + static ClusterProcesses process; + + byte[] originalKey = "key".getBytes(); + byte[] newKey = "unset".getBytes(); + byte[] value = "value".getBytes(); + + @BeforeClass + public static void before() throws FailedToStartRedisException, IOException, InterruptedException { + RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave(); + + + ClusterRunner clusterRunner = new ClusterRunner() + .addNode(master1, slave1) + .addNode(master2, slave2) + .addNode(master3, slave3); + process = clusterRunner.run(); + + Config config = new Config(); + config.useClusterServers() + .setSubscriptionMode(SubscriptionMode.SLAVE) + .setLoadBalancer(new RandomLoadBalancer()) + .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); + + redisson = Redisson.create(config); + connection = new RedissonClusterConnection(redisson); + } + + @AfterClass + public static void after() { + process.shutdown(); + redisson.shutdown(); + } + + @After + public void cleanup() { + connection.del(originalKey); + connection.del(newKey); + } + + @Test + public void testRename() { + connection.set(originalKey, value); + connection.expire(originalKey, 1000); + + Integer originalSlot = connection.clusterGetSlotForKey(originalKey); + newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot)); + + connection.rename(originalKey, newKey); + + assertThat(connection.get(newKey)).isEqualTo(value); + assertThat(connection.ttl(newKey)).isGreaterThan(0); + } + + @Test + public void testRename_pipeline() { + connection.set(originalKey, value); + + Integer originalSlot = connection.clusterGetSlotForKey(originalKey); + newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot)); + + connection.openPipeline(); + assertThatThrownBy(() -> connection.rename(originalKey, newKey)).isInstanceOf(InvalidDataAccessResourceUsageException.class); + connection.closePipeline(); + } + + protected byte[] getNewKeyForSlot(byte[] originalKey, Integer targetSlot) { + int counter = 0; + + byte[] newKey = (new String(originalKey) + counter).getBytes(); + + Integer newKeySlot = connection.clusterGetSlotForKey(newKey); + + while(!newKeySlot.equals(targetSlot)) { + counter++; + newKey = (new String(originalKey) + counter).getBytes(); + newKeySlot = connection.clusterGetSlotForKey(newKey); + } + + return newKey; + } + + @Test + public void testRenameNX() { + connection.set(originalKey, value); + connection.expire(originalKey, 1000); + + Integer originalSlot = connection.clusterGetSlotForKey(originalKey); + newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot)); + + Boolean result = connection.renameNX(originalKey, newKey); + + assertThat(connection.get(newKey)).isEqualTo(value); + assertThat(connection.ttl(newKey)).isGreaterThan(0); + assertThat(result).isTrue(); + + connection.set(originalKey, value); + + result = connection.renameNX(originalKey, newKey); + + assertThat(result).isFalse(); + } + + @Test + public void testRenameNX_pipeline() { + connection.set(originalKey, value); + + Integer originalSlot = connection.clusterGetSlotForKey(originalKey); + newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot)); + + connection.openPipeline(); + assertThatThrownBy(() -> connection.renameNX(originalKey, newKey)).isInstanceOf(InvalidDataAccessResourceUsageException.class); + connection.closePipeline(); + } + + private Integer getTargetSlot(Integer originalSlot) { + return sameSlot ? originalSlot : MAX_SLOT - originalSlot - 1; + } + +} diff --git a/redisson-spring-data/redisson-spring-data-17/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java b/redisson-spring-data/redisson-spring-data-17/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java index df6bc56fa..78068bc94 100644 --- a/redisson-spring-data/redisson-spring-data-17/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java +++ b/redisson-spring-data/redisson-spring-data-17/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java @@ -1,32 +1,34 @@ package org.redisson.spring.data.connection; -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import static org.assertj.core.api.Assertions.*; -import org.junit.After; import org.junit.AfterClass; -import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.redisson.ClusterRunner; +import org.redisson.ClusterRunner.ClusterProcesses; import org.redisson.RedisRunner; import org.redisson.RedisRunner.FailedToStartRedisException; import org.redisson.Redisson; -import org.redisson.ClusterRunner.ClusterProcesses; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.redisson.config.SubscriptionMode; import org.redisson.connection.MasterSlaveConnectionManager; import org.redisson.connection.balancer.RandomLoadBalancer; +import org.springframework.dao.InvalidDataAccessResourceUsageException; import org.springframework.data.redis.connection.ClusterInfo; import org.springframework.data.redis.connection.RedisClusterNode; +import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.RedisNode.NodeType; import org.springframework.data.redis.core.types.RedisClientInfo; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.assertj.core.api.Assertions.*; +import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT; + public class RedissonClusterConnectionTest { static RedissonClient redisson; @@ -218,5 +220,12 @@ public class RedissonClusterConnectionTest { RedisClusterNode master = map.keySet().iterator().next(); return master; } - + + @Test + public void testConnectionFactoryReturnsClusterConnection() { + RedisConnectionFactory connectionFactory = new RedissonConnectionFactory(redisson); + + assertThat(connectionFactory.getConnection()).isInstanceOf(RedissonClusterConnection.class); + } + } diff --git a/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java b/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java index 1033939da..935b01c90 100644 --- a/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java +++ b/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java @@ -39,6 +39,7 @@ import org.redisson.client.protocol.RedisStrictCommand; import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.client.protocol.decoder.StringMapDataDecoder; import org.redisson.connection.MasterSlaveEntry; +import org.springframework.dao.InvalidDataAccessResourceUsageException; import org.springframework.data.redis.connection.ClusterInfo; import org.springframework.data.redis.connection.RedisClusterConnection; import org.springframework.data.redis.connection.RedisClusterNode; @@ -377,4 +378,66 @@ public class RedissonClusterConnection extends RedissonConnection implements Red return CONVERTER.convert(list.toArray(new String[list.size()])); } + @Override + public void rename(byte[] oldName, byte[] newName) { + + if (isPipelined()) { + throw new InvalidDataAccessResourceUsageException("Clustered rename is not supported in a pipeline"); + } + + if (redisson.getConnectionManager().calcSlot(oldName) == redisson.getConnectionManager().calcSlot(newName)) { + super.rename(oldName, newName); + return; + } + + final byte[] value = dump(oldName); + + if (null != value) { + + final Long sourceTtlInSeconds = ttl(oldName); + + final long ttlInMilliseconds; + if (null != sourceTtlInSeconds && sourceTtlInSeconds > 0) { + ttlInMilliseconds = sourceTtlInSeconds * 1000; + } else { + ttlInMilliseconds = 0; + } + + restore(newName, ttlInMilliseconds, value); + del(oldName); + } + } + + @Override + public Boolean renameNX(byte[] oldName, byte[] newName) { + if (isPipelined()) { + throw new InvalidDataAccessResourceUsageException("Clustered rename is not supported in a pipeline"); + } + + if (redisson.getConnectionManager().calcSlot(oldName) == redisson.getConnectionManager().calcSlot(newName)) { + return super.renameNX(oldName, newName); + } + + final byte[] value = dump(oldName); + + if (null != value && !exists(newName)) { + + final Long sourceTtlInSeconds = ttl(oldName); + + final long ttlInMilliseconds; + if (null != sourceTtlInSeconds && sourceTtlInSeconds > 0) { + ttlInMilliseconds = sourceTtlInSeconds * 1000; + } else { + ttlInMilliseconds = 0; + } + + restore(newName, ttlInMilliseconds, value); + del(oldName); + + return true; + } + + return false; + } + } diff --git a/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/RedissonConnectionFactory.java b/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/RedissonConnectionFactory.java index 3b1146c13..2511ad0a9 100644 --- a/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/RedissonConnectionFactory.java +++ b/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/RedissonConnectionFactory.java @@ -102,6 +102,9 @@ public class RedissonConnectionFactory implements RedisConnectionFactory, @Override public RedisConnection getConnection() { + if (redisson.getConfig().isClusterConfig()) { + return new RedissonClusterConnection(redisson); + } return new RedissonConnection(redisson); } diff --git a/redisson-spring-data/redisson-spring-data-18/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionRenameTest.java b/redisson-spring-data/redisson-spring-data-18/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionRenameTest.java new file mode 100644 index 000000000..ed8a07bb7 --- /dev/null +++ b/redisson-spring-data/redisson-spring-data-18/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionRenameTest.java @@ -0,0 +1,166 @@ +package org.redisson.spring.data.connection; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.redisson.ClusterRunner; +import org.redisson.ClusterRunner.ClusterProcesses; +import org.redisson.RedisRunner; +import org.redisson.RedisRunner.FailedToStartRedisException; +import org.redisson.Redisson; +import org.redisson.api.RedissonClient; +import org.redisson.config.Config; +import org.redisson.config.SubscriptionMode; +import org.redisson.connection.balancer.RandomLoadBalancer; +import org.springframework.dao.InvalidDataAccessResourceUsageException; + +import java.io.IOException; +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.*; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT; + +@RunWith(Parameterized.class) +public class RedissonClusterConnectionRenameTest { + + @Parameterized.Parameters(name= "{index} - same slot = {0}") + public static Iterable data() { + return Arrays.asList(new Object[][] { + {false}, + {true} + }); + } + + @Parameterized.Parameter(0) + public boolean sameSlot; + + static RedissonClient redisson; + static RedissonClusterConnection connection; + static ClusterProcesses process; + + byte[] originalKey = "key".getBytes(); + byte[] newKey = "unset".getBytes(); + byte[] value = "value".getBytes(); + + @BeforeClass + public static void before() throws FailedToStartRedisException, IOException, InterruptedException { + RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave(); + + + ClusterRunner clusterRunner = new ClusterRunner() + .addNode(master1, slave1) + .addNode(master2, slave2) + .addNode(master3, slave3); + process = clusterRunner.run(); + + Config config = new Config(); + config.useClusterServers() + .setSubscriptionMode(SubscriptionMode.SLAVE) + .setLoadBalancer(new RandomLoadBalancer()) + .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); + + redisson = Redisson.create(config); + connection = new RedissonClusterConnection(redisson); + } + + @AfterClass + public static void after() { + process.shutdown(); + redisson.shutdown(); + } + + @After + public void cleanup() { + connection.del(originalKey); + connection.del(newKey); + } + + @Test + public void testRename() { + connection.set(originalKey, value); + connection.expire(originalKey, 1000); + + Integer originalSlot = connection.clusterGetSlotForKey(originalKey); + newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot)); + + connection.rename(originalKey, newKey); + + assertThat(connection.get(newKey)).isEqualTo(value); + assertThat(connection.ttl(newKey)).isGreaterThan(0); + } + + @Test + public void testRename_pipeline() { + connection.set(originalKey, value); + + Integer originalSlot = connection.clusterGetSlotForKey(originalKey); + newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot)); + + connection.openPipeline(); + assertThatThrownBy(() -> connection.rename(originalKey, newKey)).isInstanceOf(InvalidDataAccessResourceUsageException.class); + connection.closePipeline(); + } + + protected byte[] getNewKeyForSlot(byte[] originalKey, Integer targetSlot) { + int counter = 0; + + byte[] newKey = (new String(originalKey) + counter).getBytes(); + + Integer newKeySlot = connection.clusterGetSlotForKey(newKey); + + while(!newKeySlot.equals(targetSlot)) { + counter++; + newKey = (new String(originalKey) + counter).getBytes(); + newKeySlot = connection.clusterGetSlotForKey(newKey); + } + + return newKey; + } + + @Test + public void testRenameNX() { + connection.set(originalKey, value); + connection.expire(originalKey, 1000); + + Integer originalSlot = connection.clusterGetSlotForKey(originalKey); + newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot)); + + Boolean result = connection.renameNX(originalKey, newKey); + + assertThat(connection.get(newKey)).isEqualTo(value); + assertThat(connection.ttl(newKey)).isGreaterThan(0); + assertThat(result).isTrue(); + + connection.set(originalKey, value); + + result = connection.renameNX(originalKey, newKey); + + assertThat(result).isFalse(); + } + + @Test + public void testRenameNX_pipeline() { + connection.set(originalKey, value); + + Integer originalSlot = connection.clusterGetSlotForKey(originalKey); + newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot)); + + connection.openPipeline(); + assertThatThrownBy(() -> connection.renameNX(originalKey, newKey)).isInstanceOf(InvalidDataAccessResourceUsageException.class); + connection.closePipeline(); + } + + private Integer getTargetSlot(Integer originalSlot) { + return sameSlot ? originalSlot : MAX_SLOT - originalSlot - 1; + } + +} diff --git a/redisson-spring-data/redisson-spring-data-18/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java b/redisson-spring-data/redisson-spring-data-18/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java index df6bc56fa..78068bc94 100644 --- a/redisson-spring-data/redisson-spring-data-18/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java +++ b/redisson-spring-data/redisson-spring-data-18/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java @@ -1,32 +1,34 @@ package org.redisson.spring.data.connection; -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import static org.assertj.core.api.Assertions.*; -import org.junit.After; import org.junit.AfterClass; -import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.redisson.ClusterRunner; +import org.redisson.ClusterRunner.ClusterProcesses; import org.redisson.RedisRunner; import org.redisson.RedisRunner.FailedToStartRedisException; import org.redisson.Redisson; -import org.redisson.ClusterRunner.ClusterProcesses; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.redisson.config.SubscriptionMode; import org.redisson.connection.MasterSlaveConnectionManager; import org.redisson.connection.balancer.RandomLoadBalancer; +import org.springframework.dao.InvalidDataAccessResourceUsageException; import org.springframework.data.redis.connection.ClusterInfo; import org.springframework.data.redis.connection.RedisClusterNode; +import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.RedisNode.NodeType; import org.springframework.data.redis.core.types.RedisClientInfo; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.assertj.core.api.Assertions.*; +import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT; + public class RedissonClusterConnectionTest { static RedissonClient redisson; @@ -218,5 +220,12 @@ public class RedissonClusterConnectionTest { RedisClusterNode master = map.keySet().iterator().next(); return master; } - + + @Test + public void testConnectionFactoryReturnsClusterConnection() { + RedisConnectionFactory connectionFactory = new RedissonConnectionFactory(redisson); + + assertThat(connectionFactory.getConnection()).isInstanceOf(RedissonClusterConnection.class); + } + } diff --git a/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java b/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java index e5c3b0d7a..b9715d451 100644 --- a/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java +++ b/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java @@ -17,7 +17,6 @@ package org.redisson.spring.data.connection; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -39,6 +38,7 @@ import org.redisson.client.protocol.RedisStrictCommand; import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.client.protocol.decoder.StringMapDataDecoder; import org.redisson.connection.MasterSlaveEntry; +import org.springframework.dao.InvalidDataAccessResourceUsageException; import org.springframework.data.redis.connection.ClusterInfo; import org.springframework.data.redis.connection.RedisClusterConnection; import org.springframework.data.redis.connection.RedisClusterNode; @@ -382,4 +382,66 @@ public class RedissonClusterConnection extends RedissonConnection implements Red return CONVERTER.convert(list.toArray(new String[list.size()])); } + @Override + public void rename(byte[] oldName, byte[] newName) { + + if (isPipelined()) { + throw new InvalidDataAccessResourceUsageException("Clustered rename is not supported in a pipeline"); + } + + if (redisson.getConnectionManager().calcSlot(oldName) == redisson.getConnectionManager().calcSlot(newName)) { + super.rename(oldName, newName); + return; + } + + final byte[] value = dump(oldName); + + if (null != value) { + + final Long sourceTtlInSeconds = ttl(oldName); + + final long ttlInMilliseconds; + if (null != sourceTtlInSeconds && sourceTtlInSeconds > 0) { + ttlInMilliseconds = sourceTtlInSeconds * 1000; + } else { + ttlInMilliseconds = 0; + } + + restore(newName, ttlInMilliseconds, value); + del(oldName); + } + } + + @Override + public Boolean renameNX(byte[] oldName, byte[] newName) { + if (isPipelined()) { + throw new InvalidDataAccessResourceUsageException("Clustered rename is not supported in a pipeline"); + } + + if (redisson.getConnectionManager().calcSlot(oldName) == redisson.getConnectionManager().calcSlot(newName)) { + return super.renameNX(oldName, newName); + } + + final byte[] value = dump(oldName); + + if (null != value && !exists(newName)) { + + final Long sourceTtlInSeconds = ttl(oldName); + + final long ttlInMilliseconds; + if (null != sourceTtlInSeconds && sourceTtlInSeconds > 0) { + ttlInMilliseconds = sourceTtlInSeconds * 1000; + } else { + ttlInMilliseconds = 0; + } + + restore(newName, ttlInMilliseconds, value); + del(oldName); + + return true; + } + + return false; + } + } diff --git a/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonConnectionFactory.java b/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonConnectionFactory.java index 4ed48ceea..7b4a465ad 100644 --- a/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonConnectionFactory.java +++ b/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonConnectionFactory.java @@ -100,6 +100,9 @@ public class RedissonConnectionFactory implements RedisConnectionFactory, @Override public RedisConnection getConnection() { + if (redisson.getConfig().isClusterConfig()) { + return new RedissonClusterConnection(redisson); + } return new RedissonConnection(redisson); } diff --git a/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonReactiveClusterKeyCommands.java b/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonReactiveClusterKeyCommands.java index 887f5b422..001e1901e 100644 --- a/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonReactiveClusterKeyCommands.java +++ b/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonReactiveClusterKeyCommands.java @@ -17,8 +17,10 @@ package org.redisson.spring.data.connection; import java.nio.ByteBuffer; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; +import org.reactivestreams.Publisher; import org.redisson.api.RFuture; import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.codec.StringCodec; @@ -26,13 +28,18 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.connection.MasterSlaveEntry; import org.redisson.reactive.CommandReactiveExecutor; import org.springframework.data.redis.connection.ReactiveClusterKeyCommands; +import org.springframework.data.redis.connection.ReactiveRedisConnection; +import org.springframework.data.redis.connection.ReactiveRedisConnection.BooleanResponse; import org.springframework.data.redis.connection.RedisClusterNode; +import org.springframework.util.Assert; import io.netty.util.CharsetUtil; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; /** - * + * * @author Nikita Koksharov * */ @@ -59,4 +66,71 @@ public class RedissonReactiveClusterKeyCommands extends RedissonReactiveKeyComma return m.map(v -> ByteBuffer.wrap(v)); } + @Override + public Flux> rename(Publisher commands) { + + return execute(commands, command -> { + Assert.notNull(command.getKey(), "Key must not be null!"); + Assert.notNull(command.getNewName(), "New name must not be null!"); + + byte[] keyBuf = toByteArray(command.getKey()); + byte[] newKeyBuf = toByteArray(command.getNewName()); + + if (executorService.getConnectionManager().calcSlot(keyBuf) == executorService.getConnectionManager().calcSlot(newKeyBuf)) { + return super.rename(commands); + } + + return read(keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.DUMP, keyBuf) + .filter(Objects::nonNull) + .zipWith( + Mono.defer(() -> pTtl(command.getKey()) + .filter(Objects::nonNull) + .map(ttl -> Math.max(0, ttl)) + .switchIfEmpty(Mono.just(0L)) + ) + ) + .flatMap(valueAndTtl -> { + return write(newKeyBuf, StringCodec.INSTANCE, RedisCommands.RESTORE, newKeyBuf, valueAndTtl.getT2(), valueAndTtl.getT1()); + }) + .thenReturn(new BooleanResponse<>(command, true)) + .doOnSuccess((ignored) -> del(command.getKey())); + }); + } + + @Override + public Flux> renameNX(Publisher commands) { + return execute(commands, command -> { + Assert.notNull(command.getKey(), "Key must not be null!"); + Assert.notNull(command.getNewName(), "New name must not be null!"); + + byte[] keyBuf = toByteArray(command.getKey()); + byte[] newKeyBuf = toByteArray(command.getNewName()); + + if (executorService.getConnectionManager().calcSlot(keyBuf) == executorService.getConnectionManager().calcSlot(newKeyBuf)) { + return super.renameNX(commands); + } + + return exists(command.getNewName()) + .zipWith(read(keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.DUMP, keyBuf)) + .filter(newKeyExistsAndDump -> !newKeyExistsAndDump.getT1() && Objects.nonNull(newKeyExistsAndDump.getT2())) + .map(Tuple2::getT2) + .zipWhen(value -> + pTtl(command.getKey()) + .filter(Objects::nonNull) + .map(ttl -> Math.max(0, ttl)) + .switchIfEmpty(Mono.just(0L)) + + ) + .flatMap(valueAndTtl -> write(newKeyBuf, StringCodec.INSTANCE, RedisCommands.RESTORE, newKeyBuf, valueAndTtl.getT2(), valueAndTtl.getT1()) + .then(Mono.just(true))) + .switchIfEmpty(Mono.just(false)) + .doOnSuccess(didRename -> { + if (didRename) { + del(command.getKey()); + } + }) + .map(didRename -> new BooleanResponse<>(command, didRename)); + }); + } + } diff --git a/redisson-spring-data/redisson-spring-data-20/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionRenameTest.java b/redisson-spring-data/redisson-spring-data-20/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionRenameTest.java new file mode 100644 index 000000000..ed8a07bb7 --- /dev/null +++ b/redisson-spring-data/redisson-spring-data-20/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionRenameTest.java @@ -0,0 +1,166 @@ +package org.redisson.spring.data.connection; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.redisson.ClusterRunner; +import org.redisson.ClusterRunner.ClusterProcesses; +import org.redisson.RedisRunner; +import org.redisson.RedisRunner.FailedToStartRedisException; +import org.redisson.Redisson; +import org.redisson.api.RedissonClient; +import org.redisson.config.Config; +import org.redisson.config.SubscriptionMode; +import org.redisson.connection.balancer.RandomLoadBalancer; +import org.springframework.dao.InvalidDataAccessResourceUsageException; + +import java.io.IOException; +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.*; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT; + +@RunWith(Parameterized.class) +public class RedissonClusterConnectionRenameTest { + + @Parameterized.Parameters(name= "{index} - same slot = {0}") + public static Iterable data() { + return Arrays.asList(new Object[][] { + {false}, + {true} + }); + } + + @Parameterized.Parameter(0) + public boolean sameSlot; + + static RedissonClient redisson; + static RedissonClusterConnection connection; + static ClusterProcesses process; + + byte[] originalKey = "key".getBytes(); + byte[] newKey = "unset".getBytes(); + byte[] value = "value".getBytes(); + + @BeforeClass + public static void before() throws FailedToStartRedisException, IOException, InterruptedException { + RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave(); + + + ClusterRunner clusterRunner = new ClusterRunner() + .addNode(master1, slave1) + .addNode(master2, slave2) + .addNode(master3, slave3); + process = clusterRunner.run(); + + Config config = new Config(); + config.useClusterServers() + .setSubscriptionMode(SubscriptionMode.SLAVE) + .setLoadBalancer(new RandomLoadBalancer()) + .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); + + redisson = Redisson.create(config); + connection = new RedissonClusterConnection(redisson); + } + + @AfterClass + public static void after() { + process.shutdown(); + redisson.shutdown(); + } + + @After + public void cleanup() { + connection.del(originalKey); + connection.del(newKey); + } + + @Test + public void testRename() { + connection.set(originalKey, value); + connection.expire(originalKey, 1000); + + Integer originalSlot = connection.clusterGetSlotForKey(originalKey); + newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot)); + + connection.rename(originalKey, newKey); + + assertThat(connection.get(newKey)).isEqualTo(value); + assertThat(connection.ttl(newKey)).isGreaterThan(0); + } + + @Test + public void testRename_pipeline() { + connection.set(originalKey, value); + + Integer originalSlot = connection.clusterGetSlotForKey(originalKey); + newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot)); + + connection.openPipeline(); + assertThatThrownBy(() -> connection.rename(originalKey, newKey)).isInstanceOf(InvalidDataAccessResourceUsageException.class); + connection.closePipeline(); + } + + protected byte[] getNewKeyForSlot(byte[] originalKey, Integer targetSlot) { + int counter = 0; + + byte[] newKey = (new String(originalKey) + counter).getBytes(); + + Integer newKeySlot = connection.clusterGetSlotForKey(newKey); + + while(!newKeySlot.equals(targetSlot)) { + counter++; + newKey = (new String(originalKey) + counter).getBytes(); + newKeySlot = connection.clusterGetSlotForKey(newKey); + } + + return newKey; + } + + @Test + public void testRenameNX() { + connection.set(originalKey, value); + connection.expire(originalKey, 1000); + + Integer originalSlot = connection.clusterGetSlotForKey(originalKey); + newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot)); + + Boolean result = connection.renameNX(originalKey, newKey); + + assertThat(connection.get(newKey)).isEqualTo(value); + assertThat(connection.ttl(newKey)).isGreaterThan(0); + assertThat(result).isTrue(); + + connection.set(originalKey, value); + + result = connection.renameNX(originalKey, newKey); + + assertThat(result).isFalse(); + } + + @Test + public void testRenameNX_pipeline() { + connection.set(originalKey, value); + + Integer originalSlot = connection.clusterGetSlotForKey(originalKey); + newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot)); + + connection.openPipeline(); + assertThatThrownBy(() -> connection.renameNX(originalKey, newKey)).isInstanceOf(InvalidDataAccessResourceUsageException.class); + connection.closePipeline(); + } + + private Integer getTargetSlot(Integer originalSlot) { + return sameSlot ? originalSlot : MAX_SLOT - originalSlot - 1; + } + +} diff --git a/redisson-spring-data/redisson-spring-data-20/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java b/redisson-spring-data/redisson-spring-data-20/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java index ca6ddb40b..9b9a26021 100644 --- a/redisson-spring-data/redisson-spring-data-20/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java +++ b/redisson-spring-data/redisson-spring-data-20/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java @@ -1,32 +1,32 @@ package org.redisson.spring.data.connection; -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import static org.assertj.core.api.Assertions.*; -import org.junit.After; import org.junit.AfterClass; -import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.redisson.ClusterRunner; +import org.redisson.ClusterRunner.ClusterProcesses; import org.redisson.RedisRunner; import org.redisson.RedisRunner.FailedToStartRedisException; import org.redisson.Redisson; -import org.redisson.ClusterRunner.ClusterProcesses; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.redisson.config.SubscriptionMode; -import org.redisson.connection.MasterSlaveConnectionManager; import org.redisson.connection.balancer.RandomLoadBalancer; import org.springframework.data.redis.connection.ClusterInfo; import org.springframework.data.redis.connection.RedisClusterNode; +import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.RedisNode.NodeType; import org.springframework.data.redis.core.types.RedisClientInfo; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT; + public class RedissonClusterConnectionTest { static RedissonClient redisson; @@ -127,8 +127,8 @@ public class RedissonClusterConnectionTest { public void testClusterGetClusterInfo() { ClusterInfo info = connection.clusterGetClusterInfo(); assertThat(info.getSlotsFail()).isEqualTo(0); - assertThat(info.getSlotsOk()).isEqualTo(MasterSlaveConnectionManager.MAX_SLOT); - assertThat(info.getSlotsAssigned()).isEqualTo(MasterSlaveConnectionManager.MAX_SLOT); + assertThat(info.getSlotsOk()).isEqualTo(MAX_SLOT); + assertThat(info.getSlotsAssigned()).isEqualTo(MAX_SLOT); } @Test @@ -218,5 +218,12 @@ public class RedissonClusterConnectionTest { RedisClusterNode master = map.keySet().iterator().next(); return master; } - + + @Test + public void testConnectionFactoryReturnsClusterConnection() { + RedisConnectionFactory connectionFactory = new RedissonConnectionFactory(redisson); + + assertThat(connectionFactory.getConnection()).isInstanceOf(RedissonClusterConnection.class); + } + } diff --git a/redisson-spring-data/redisson-spring-data-20/src/test/java/org/redisson/spring/data/connection/RedissonReactiveClusterKeyCommandsTest.java b/redisson-spring-data/redisson-spring-data-20/src/test/java/org/redisson/spring/data/connection/RedissonReactiveClusterKeyCommandsTest.java new file mode 100644 index 000000000..c2388f3af --- /dev/null +++ b/redisson-spring-data/redisson-spring-data-20/src/test/java/org/redisson/spring/data/connection/RedissonReactiveClusterKeyCommandsTest.java @@ -0,0 +1,196 @@ +package org.redisson.spring.data.connection; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.redisson.ClusterRunner; +import org.redisson.ClusterRunner.ClusterProcesses; +import org.redisson.RedisRunner; +import org.redisson.RedisRunner.FailedToStartRedisException; +import org.redisson.Redisson; +import org.redisson.RedissonKeys; +import org.redisson.api.RedissonClient; +import org.redisson.client.codec.StringCodec; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.config.Config; +import org.redisson.config.SubscriptionMode; +import org.redisson.connection.balancer.RandomLoadBalancer; +import org.redisson.reactive.CommandReactiveService; +import org.springframework.data.redis.RedisSystemException; + + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT; + +@RunWith(Parameterized.class) +public class RedissonReactiveClusterKeyCommandsTest { + + @Parameterized.Parameters(name= "{index} - same slot = {0}; has ttl = {1}") + public static Iterable data() { + return Arrays.asList(new Object[][] { + {false, false}, + {true, false}, + {false, true}, + {true, true} + }); + } + + @Parameterized.Parameter(0) + public boolean sameSlot; + + @Parameterized.Parameter(1) + public boolean hasTtl; + + static RedissonClient redisson; + static RedissonReactiveRedisClusterConnection connection; + static ClusterProcesses process; + + ByteBuffer originalKey = ByteBuffer.wrap("key".getBytes()); + ByteBuffer newKey = ByteBuffer.wrap("unset".getBytes()); + ByteBuffer value = ByteBuffer.wrap("value".getBytes()); + + @BeforeClass + public static void before() throws FailedToStartRedisException, IOException, InterruptedException { + RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave(); + + + ClusterRunner clusterRunner = new ClusterRunner() + .addNode(master1, slave1) + .addNode(master2, slave2) + .addNode(master3, slave3); + process = clusterRunner.run(); + + Config config = new Config(); + config.useClusterServers() + .setSubscriptionMode(SubscriptionMode.SLAVE) + .setLoadBalancer(new RandomLoadBalancer()) + .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); + + redisson = Redisson.create(config); + connection = new RedissonReactiveRedisClusterConnection(new CommandReactiveService(((RedissonKeys) redisson.getKeys()).getConnectionManager())); + } + + @AfterClass + public static void after() { + process.shutdown(); + redisson.shutdown(); + } + + @After + public void cleanup() { + connection.keyCommands().del(originalKey) + .and(connection.keyCommands().del(newKey)) + .block(); + } + + @Test + public void testRename() { + connection.stringCommands().set(originalKey, value).block(); + + if (hasTtl) { + connection.keyCommands().expire(originalKey, Duration.ofSeconds(1000)).block(); + } + + Integer originalSlot = getSlotForKey(originalKey); + newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot)); + + Boolean response = connection.keyCommands().rename(originalKey, newKey).block(); + + assertThat(response).isTrue(); + + final ByteBuffer newKeyValue = connection.stringCommands().get(newKey).block(); + assertThat(newKeyValue).isEqualTo(value); + if (hasTtl) { + assertThat(connection.keyCommands().ttl(newKey).block()).isGreaterThan(0); + } else { + assertThat(connection.keyCommands().ttl(newKey).block()).isEqualTo(-1); + } + } + + @Test + public void testRename_keyNotExist() { + Integer originalSlot = getSlotForKey(originalKey); + newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot)); + + if (sameSlot) { + // This is a quirk of the implementation - since same-slot renames use the non-cluster version, + // the result is a Redis error. This behavior matches other spring-data-redis implementations + assertThatThrownBy(() -> connection.keyCommands().rename(originalKey, newKey).block()) + .isInstanceOf(RedisSystemException.class); + + } else { + Boolean response = connection.keyCommands().rename(originalKey, newKey).block(); + + assertThat(response).isTrue(); + + final ByteBuffer newKeyValue = connection.stringCommands().get(newKey).block(); + assertThat(newKeyValue).isEqualTo(null); + } + } + + protected ByteBuffer getNewKeyForSlot(String originalKey, Integer targetSlot) { + int counter = 0; + + ByteBuffer newKey = ByteBuffer.wrap((originalKey + counter).getBytes()); + + Integer newKeySlot = getSlotForKey(newKey); + + while(!newKeySlot.equals(targetSlot)) { + counter++; + newKey = ByteBuffer.wrap((originalKey + counter).getBytes()); + newKeySlot = getSlotForKey(newKey); + } + + return newKey; + } + + @Test + public void testRenameNX() { + connection.stringCommands().set(originalKey, value).block(); + if (hasTtl) { + connection.keyCommands().expire(originalKey, Duration.ofSeconds(1000)).block(); + } + + Integer originalSlot = getSlotForKey(originalKey); + newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot)); + + Boolean result = connection.keyCommands().renameNX(originalKey, newKey).block(); + + assertThat(result).isTrue(); + assertThat(connection.stringCommands().get(newKey).block()).isEqualTo(value); + if (hasTtl) { + assertThat(connection.keyCommands().ttl(newKey).block()).isGreaterThan(0); + } else { + assertThat(connection.keyCommands().ttl(newKey).block()).isEqualTo(-1); + } + + connection.stringCommands().set(originalKey, value).block(); + + result = connection.keyCommands().renameNX(originalKey, newKey).block(); + + assertThat(result).isFalse(); + } + + private Integer getTargetSlot(Integer originalSlot) { + return sameSlot ? originalSlot : MAX_SLOT - originalSlot - 1; + } + + private Integer getSlotForKey(ByteBuffer key) { + return (Integer) connection.read(null, StringCodec.INSTANCE, RedisCommands.KEYSLOT, key.array()).block(); + } + +} diff --git a/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java b/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java index 1b3a4ee02..c01302a12 100644 --- a/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java +++ b/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java @@ -40,6 +40,7 @@ import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.client.protocol.decoder.StringMapDataDecoder; import org.redisson.connection.MasterSlaveEntry; +import org.springframework.dao.InvalidDataAccessResourceUsageException; import org.springframework.data.redis.connection.ClusterInfo; import org.springframework.data.redis.connection.DefaultedRedisClusterConnection; import org.springframework.data.redis.connection.RedisClusterNode; @@ -430,4 +431,66 @@ public class RedissonClusterConnection extends RedissonConnection implements Def }.open(); } + @Override + public void rename(byte[] oldName, byte[] newName) { + + if (isPipelined()) { + throw new InvalidDataAccessResourceUsageException("Clustered rename is not supported in a pipeline"); + } + + if (redisson.getConnectionManager().calcSlot(oldName) == redisson.getConnectionManager().calcSlot(newName)) { + super.rename(oldName, newName); + return; + } + + final byte[] value = dump(oldName); + + if (null != value) { + + final Long sourceTtlInSeconds = ttl(oldName); + + final long ttlInMilliseconds; + if (null != sourceTtlInSeconds && sourceTtlInSeconds > 0) { + ttlInMilliseconds = sourceTtlInSeconds * 1000; + } else { + ttlInMilliseconds = 0; + } + + restore(newName, ttlInMilliseconds, value); + del(oldName); + } + } + + @Override + public Boolean renameNX(byte[] oldName, byte[] newName) { + if (isPipelined()) { + throw new InvalidDataAccessResourceUsageException("Clustered rename is not supported in a pipeline"); + } + + if (redisson.getConnectionManager().calcSlot(oldName) == redisson.getConnectionManager().calcSlot(newName)) { + return super.renameNX(oldName, newName); + } + + final byte[] value = dump(oldName); + + if (null != value && !exists(newName)) { + + final Long sourceTtlInSeconds = ttl(oldName); + + final long ttlInMilliseconds; + if (null != sourceTtlInSeconds && sourceTtlInSeconds > 0) { + ttlInMilliseconds = sourceTtlInSeconds * 1000; + } else { + ttlInMilliseconds = 0; + } + + restore(newName, ttlInMilliseconds, value); + del(oldName); + + return true; + } + + return false; + } + } diff --git a/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonConnectionFactory.java b/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonConnectionFactory.java index 4ed48ceea..7b4a465ad 100644 --- a/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonConnectionFactory.java +++ b/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonConnectionFactory.java @@ -100,6 +100,9 @@ public class RedissonConnectionFactory implements RedisConnectionFactory, @Override public RedisConnection getConnection() { + if (redisson.getConfig().isClusterConfig()) { + return new RedissonClusterConnection(redisson); + } return new RedissonConnection(redisson); } diff --git a/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveClusterKeyCommands.java b/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveClusterKeyCommands.java index 887f5b422..001e1901e 100644 --- a/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveClusterKeyCommands.java +++ b/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveClusterKeyCommands.java @@ -17,8 +17,10 @@ package org.redisson.spring.data.connection; import java.nio.ByteBuffer; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; +import org.reactivestreams.Publisher; import org.redisson.api.RFuture; import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.codec.StringCodec; @@ -26,13 +28,18 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.connection.MasterSlaveEntry; import org.redisson.reactive.CommandReactiveExecutor; import org.springframework.data.redis.connection.ReactiveClusterKeyCommands; +import org.springframework.data.redis.connection.ReactiveRedisConnection; +import org.springframework.data.redis.connection.ReactiveRedisConnection.BooleanResponse; import org.springframework.data.redis.connection.RedisClusterNode; +import org.springframework.util.Assert; import io.netty.util.CharsetUtil; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; /** - * + * * @author Nikita Koksharov * */ @@ -59,4 +66,71 @@ public class RedissonReactiveClusterKeyCommands extends RedissonReactiveKeyComma return m.map(v -> ByteBuffer.wrap(v)); } + @Override + public Flux> rename(Publisher commands) { + + return execute(commands, command -> { + Assert.notNull(command.getKey(), "Key must not be null!"); + Assert.notNull(command.getNewName(), "New name must not be null!"); + + byte[] keyBuf = toByteArray(command.getKey()); + byte[] newKeyBuf = toByteArray(command.getNewName()); + + if (executorService.getConnectionManager().calcSlot(keyBuf) == executorService.getConnectionManager().calcSlot(newKeyBuf)) { + return super.rename(commands); + } + + return read(keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.DUMP, keyBuf) + .filter(Objects::nonNull) + .zipWith( + Mono.defer(() -> pTtl(command.getKey()) + .filter(Objects::nonNull) + .map(ttl -> Math.max(0, ttl)) + .switchIfEmpty(Mono.just(0L)) + ) + ) + .flatMap(valueAndTtl -> { + return write(newKeyBuf, StringCodec.INSTANCE, RedisCommands.RESTORE, newKeyBuf, valueAndTtl.getT2(), valueAndTtl.getT1()); + }) + .thenReturn(new BooleanResponse<>(command, true)) + .doOnSuccess((ignored) -> del(command.getKey())); + }); + } + + @Override + public Flux> renameNX(Publisher commands) { + return execute(commands, command -> { + Assert.notNull(command.getKey(), "Key must not be null!"); + Assert.notNull(command.getNewName(), "New name must not be null!"); + + byte[] keyBuf = toByteArray(command.getKey()); + byte[] newKeyBuf = toByteArray(command.getNewName()); + + if (executorService.getConnectionManager().calcSlot(keyBuf) == executorService.getConnectionManager().calcSlot(newKeyBuf)) { + return super.renameNX(commands); + } + + return exists(command.getNewName()) + .zipWith(read(keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.DUMP, keyBuf)) + .filter(newKeyExistsAndDump -> !newKeyExistsAndDump.getT1() && Objects.nonNull(newKeyExistsAndDump.getT2())) + .map(Tuple2::getT2) + .zipWhen(value -> + pTtl(command.getKey()) + .filter(Objects::nonNull) + .map(ttl -> Math.max(0, ttl)) + .switchIfEmpty(Mono.just(0L)) + + ) + .flatMap(valueAndTtl -> write(newKeyBuf, StringCodec.INSTANCE, RedisCommands.RESTORE, newKeyBuf, valueAndTtl.getT2(), valueAndTtl.getT1()) + .then(Mono.just(true))) + .switchIfEmpty(Mono.just(false)) + .doOnSuccess(didRename -> { + if (didRename) { + del(command.getKey()); + } + }) + .map(didRename -> new BooleanResponse<>(command, didRename)); + }); + } + } diff --git a/redisson-spring-data/redisson-spring-data-21/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionRenameTest.java b/redisson-spring-data/redisson-spring-data-21/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionRenameTest.java new file mode 100644 index 000000000..ed8a07bb7 --- /dev/null +++ b/redisson-spring-data/redisson-spring-data-21/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionRenameTest.java @@ -0,0 +1,166 @@ +package org.redisson.spring.data.connection; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.redisson.ClusterRunner; +import org.redisson.ClusterRunner.ClusterProcesses; +import org.redisson.RedisRunner; +import org.redisson.RedisRunner.FailedToStartRedisException; +import org.redisson.Redisson; +import org.redisson.api.RedissonClient; +import org.redisson.config.Config; +import org.redisson.config.SubscriptionMode; +import org.redisson.connection.balancer.RandomLoadBalancer; +import org.springframework.dao.InvalidDataAccessResourceUsageException; + +import java.io.IOException; +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.*; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT; + +@RunWith(Parameterized.class) +public class RedissonClusterConnectionRenameTest { + + @Parameterized.Parameters(name= "{index} - same slot = {0}") + public static Iterable data() { + return Arrays.asList(new Object[][] { + {false}, + {true} + }); + } + + @Parameterized.Parameter(0) + public boolean sameSlot; + + static RedissonClient redisson; + static RedissonClusterConnection connection; + static ClusterProcesses process; + + byte[] originalKey = "key".getBytes(); + byte[] newKey = "unset".getBytes(); + byte[] value = "value".getBytes(); + + @BeforeClass + public static void before() throws FailedToStartRedisException, IOException, InterruptedException { + RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave(); + + + ClusterRunner clusterRunner = new ClusterRunner() + .addNode(master1, slave1) + .addNode(master2, slave2) + .addNode(master3, slave3); + process = clusterRunner.run(); + + Config config = new Config(); + config.useClusterServers() + .setSubscriptionMode(SubscriptionMode.SLAVE) + .setLoadBalancer(new RandomLoadBalancer()) + .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); + + redisson = Redisson.create(config); + connection = new RedissonClusterConnection(redisson); + } + + @AfterClass + public static void after() { + process.shutdown(); + redisson.shutdown(); + } + + @After + public void cleanup() { + connection.del(originalKey); + connection.del(newKey); + } + + @Test + public void testRename() { + connection.set(originalKey, value); + connection.expire(originalKey, 1000); + + Integer originalSlot = connection.clusterGetSlotForKey(originalKey); + newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot)); + + connection.rename(originalKey, newKey); + + assertThat(connection.get(newKey)).isEqualTo(value); + assertThat(connection.ttl(newKey)).isGreaterThan(0); + } + + @Test + public void testRename_pipeline() { + connection.set(originalKey, value); + + Integer originalSlot = connection.clusterGetSlotForKey(originalKey); + newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot)); + + connection.openPipeline(); + assertThatThrownBy(() -> connection.rename(originalKey, newKey)).isInstanceOf(InvalidDataAccessResourceUsageException.class); + connection.closePipeline(); + } + + protected byte[] getNewKeyForSlot(byte[] originalKey, Integer targetSlot) { + int counter = 0; + + byte[] newKey = (new String(originalKey) + counter).getBytes(); + + Integer newKeySlot = connection.clusterGetSlotForKey(newKey); + + while(!newKeySlot.equals(targetSlot)) { + counter++; + newKey = (new String(originalKey) + counter).getBytes(); + newKeySlot = connection.clusterGetSlotForKey(newKey); + } + + return newKey; + } + + @Test + public void testRenameNX() { + connection.set(originalKey, value); + connection.expire(originalKey, 1000); + + Integer originalSlot = connection.clusterGetSlotForKey(originalKey); + newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot)); + + Boolean result = connection.renameNX(originalKey, newKey); + + assertThat(connection.get(newKey)).isEqualTo(value); + assertThat(connection.ttl(newKey)).isGreaterThan(0); + assertThat(result).isTrue(); + + connection.set(originalKey, value); + + result = connection.renameNX(originalKey, newKey); + + assertThat(result).isFalse(); + } + + @Test + public void testRenameNX_pipeline() { + connection.set(originalKey, value); + + Integer originalSlot = connection.clusterGetSlotForKey(originalKey); + newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot)); + + connection.openPipeline(); + assertThatThrownBy(() -> connection.renameNX(originalKey, newKey)).isInstanceOf(InvalidDataAccessResourceUsageException.class); + connection.closePipeline(); + } + + private Integer getTargetSlot(Integer originalSlot) { + return sameSlot ? originalSlot : MAX_SLOT - originalSlot - 1; + } + +} diff --git a/redisson-spring-data/redisson-spring-data-21/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java b/redisson-spring-data/redisson-spring-data-21/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java index ca6ddb40b..a47b7f183 100644 --- a/redisson-spring-data/redisson-spring-data-21/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java +++ b/redisson-spring-data/redisson-spring-data-21/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java @@ -1,32 +1,34 @@ package org.redisson.spring.data.connection; -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import static org.assertj.core.api.Assertions.*; -import org.junit.After; import org.junit.AfterClass; -import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.redisson.ClusterRunner; +import org.redisson.ClusterRunner.ClusterProcesses; import org.redisson.RedisRunner; import org.redisson.RedisRunner.FailedToStartRedisException; import org.redisson.Redisson; -import org.redisson.ClusterRunner.ClusterProcesses; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.redisson.config.SubscriptionMode; import org.redisson.connection.MasterSlaveConnectionManager; import org.redisson.connection.balancer.RandomLoadBalancer; +import org.springframework.dao.InvalidDataAccessResourceUsageException; import org.springframework.data.redis.connection.ClusterInfo; import org.springframework.data.redis.connection.RedisClusterNode; +import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.RedisNode.NodeType; import org.springframework.data.redis.core.types.RedisClientInfo; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.assertj.core.api.Assertions.*; +import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT; + public class RedissonClusterConnectionTest { static RedissonClient redisson; @@ -218,5 +220,12 @@ public class RedissonClusterConnectionTest { RedisClusterNode master = map.keySet().iterator().next(); return master; } - + + @Test + public void testConnectionFactoryReturnsClusterConnection() { + RedisConnectionFactory connectionFactory = new RedissonConnectionFactory(redisson); + + assertThat(connectionFactory.getConnection()).isInstanceOf(RedissonClusterConnection.class); + } + } diff --git a/redisson-spring-data/redisson-spring-data-21/src/test/java/org/redisson/spring/data/connection/RedissonReactiveClusterKeyCommandsTest.java b/redisson-spring-data/redisson-spring-data-21/src/test/java/org/redisson/spring/data/connection/RedissonReactiveClusterKeyCommandsTest.java new file mode 100644 index 000000000..1f5126810 --- /dev/null +++ b/redisson-spring-data/redisson-spring-data-21/src/test/java/org/redisson/spring/data/connection/RedissonReactiveClusterKeyCommandsTest.java @@ -0,0 +1,195 @@ +package org.redisson.spring.data.connection; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.redisson.ClusterRunner; +import org.redisson.ClusterRunner.ClusterProcesses; +import org.redisson.RedisRunner; +import org.redisson.RedisRunner.FailedToStartRedisException; +import org.redisson.Redisson; +import org.redisson.RedissonKeys; +import org.redisson.api.RedissonClient; +import org.redisson.client.codec.StringCodec; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.config.Config; +import org.redisson.config.SubscriptionMode; +import org.redisson.connection.balancer.RandomLoadBalancer; +import org.redisson.reactive.CommandReactiveService; +import org.springframework.data.redis.RedisSystemException; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT; + +@RunWith(Parameterized.class) +public class RedissonReactiveClusterKeyCommandsTest { + + @Parameterized.Parameters(name= "{index} - same slot = {0}; has ttl = {1}") + public static Iterable data() { + return Arrays.asList(new Object[][] { + {false, false}, + {true, false}, + {false, true}, + {true, true} + }); + } + + @Parameterized.Parameter(0) + public boolean sameSlot; + + @Parameterized.Parameter(1) + public boolean hasTtl; + + static RedissonClient redisson; + static RedissonReactiveRedisClusterConnection connection; + static ClusterProcesses process; + + ByteBuffer originalKey = ByteBuffer.wrap("key".getBytes()); + ByteBuffer newKey = ByteBuffer.wrap("unset".getBytes()); + ByteBuffer value = ByteBuffer.wrap("value".getBytes()); + + @BeforeClass + public static void before() throws FailedToStartRedisException, IOException, InterruptedException { + RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave(); + + + ClusterRunner clusterRunner = new ClusterRunner() + .addNode(master1, slave1) + .addNode(master2, slave2) + .addNode(master3, slave3); + process = clusterRunner.run(); + + Config config = new Config(); + config.useClusterServers() + .setSubscriptionMode(SubscriptionMode.SLAVE) + .setLoadBalancer(new RandomLoadBalancer()) + .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); + + redisson = Redisson.create(config); + connection = new RedissonReactiveRedisClusterConnection(new CommandReactiveService(((RedissonKeys) redisson.getKeys()).getConnectionManager())); + } + + @AfterClass + public static void after() { + process.shutdown(); + redisson.shutdown(); + } + + @After + public void cleanup() { + connection.keyCommands().del(originalKey) + .and(connection.keyCommands().del(newKey)) + .block(); + } + + @Test + public void testRename() { + connection.stringCommands().set(originalKey, value).block(); + + if (hasTtl) { + connection.keyCommands().expire(originalKey, Duration.ofSeconds(1000)).block(); + } + + Integer originalSlot = getSlotForKey(originalKey); + newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot)); + + Boolean response = connection.keyCommands().rename(originalKey, newKey).block(); + + assertThat(response).isTrue(); + + final ByteBuffer newKeyValue = connection.stringCommands().get(newKey).block(); + assertThat(newKeyValue).isEqualTo(value); + if (hasTtl) { + assertThat(connection.keyCommands().ttl(newKey).block()).isGreaterThan(0); + } else { + assertThat(connection.keyCommands().ttl(newKey).block()).isEqualTo(-1); + } + } + + @Test + public void testRename_keyNotExist() { + Integer originalSlot = getSlotForKey(originalKey); + newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot)); + + if (sameSlot) { + // This is a quirk of the implementation - since same-slot renames use the non-cluster version, + // the result is a Redis error. This behavior matches other spring-data-redis implementations + assertThatThrownBy(() -> connection.keyCommands().rename(originalKey, newKey).block()) + .isInstanceOf(RedisSystemException.class); + + } else { + Boolean response = connection.keyCommands().rename(originalKey, newKey).block(); + + assertThat(response).isTrue(); + + final ByteBuffer newKeyValue = connection.stringCommands().get(newKey).block(); + assertThat(newKeyValue).isEqualTo(null); + } + } + + protected ByteBuffer getNewKeyForSlot(String originalKey, Integer targetSlot) { + int counter = 0; + + ByteBuffer newKey = ByteBuffer.wrap((originalKey + counter).getBytes()); + + Integer newKeySlot = getSlotForKey(newKey); + + while(!newKeySlot.equals(targetSlot)) { + counter++; + newKey = ByteBuffer.wrap((originalKey + counter).getBytes()); + newKeySlot = getSlotForKey(newKey); + } + + return newKey; + } + + @Test + public void testRenameNX() { + connection.stringCommands().set(originalKey, value).block(); + if (hasTtl) { + connection.keyCommands().expire(originalKey, Duration.ofSeconds(1000)).block(); + } + + Integer originalSlot = getSlotForKey(originalKey); + newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot)); + + Boolean result = connection.keyCommands().renameNX(originalKey, newKey).block(); + + assertThat(result).isTrue(); + assertThat(connection.stringCommands().get(newKey).block()).isEqualTo(value); + if (hasTtl) { + assertThat(connection.keyCommands().ttl(newKey).block()).isGreaterThan(0); + } else { + assertThat(connection.keyCommands().ttl(newKey).block()).isEqualTo(-1); + } + + connection.stringCommands().set(originalKey, value).block(); + + result = connection.keyCommands().renameNX(originalKey, newKey).block(); + + assertThat(result).isFalse(); + } + + private Integer getTargetSlot(Integer originalSlot) { + return sameSlot ? originalSlot : MAX_SLOT - originalSlot - 1; + } + + private Integer getSlotForKey(ByteBuffer key) { + return (Integer) connection.read(null, StringCodec.INSTANCE, RedisCommands.KEYSLOT, key.array()).block(); + } + +} diff --git a/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java b/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java index 1b3a4ee02..c01302a12 100644 --- a/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java +++ b/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java @@ -40,6 +40,7 @@ import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.client.protocol.decoder.StringMapDataDecoder; import org.redisson.connection.MasterSlaveEntry; +import org.springframework.dao.InvalidDataAccessResourceUsageException; import org.springframework.data.redis.connection.ClusterInfo; import org.springframework.data.redis.connection.DefaultedRedisClusterConnection; import org.springframework.data.redis.connection.RedisClusterNode; @@ -430,4 +431,66 @@ public class RedissonClusterConnection extends RedissonConnection implements Def }.open(); } + @Override + public void rename(byte[] oldName, byte[] newName) { + + if (isPipelined()) { + throw new InvalidDataAccessResourceUsageException("Clustered rename is not supported in a pipeline"); + } + + if (redisson.getConnectionManager().calcSlot(oldName) == redisson.getConnectionManager().calcSlot(newName)) { + super.rename(oldName, newName); + return; + } + + final byte[] value = dump(oldName); + + if (null != value) { + + final Long sourceTtlInSeconds = ttl(oldName); + + final long ttlInMilliseconds; + if (null != sourceTtlInSeconds && sourceTtlInSeconds > 0) { + ttlInMilliseconds = sourceTtlInSeconds * 1000; + } else { + ttlInMilliseconds = 0; + } + + restore(newName, ttlInMilliseconds, value); + del(oldName); + } + } + + @Override + public Boolean renameNX(byte[] oldName, byte[] newName) { + if (isPipelined()) { + throw new InvalidDataAccessResourceUsageException("Clustered rename is not supported in a pipeline"); + } + + if (redisson.getConnectionManager().calcSlot(oldName) == redisson.getConnectionManager().calcSlot(newName)) { + return super.renameNX(oldName, newName); + } + + final byte[] value = dump(oldName); + + if (null != value && !exists(newName)) { + + final Long sourceTtlInSeconds = ttl(oldName); + + final long ttlInMilliseconds; + if (null != sourceTtlInSeconds && sourceTtlInSeconds > 0) { + ttlInMilliseconds = sourceTtlInSeconds * 1000; + } else { + ttlInMilliseconds = 0; + } + + restore(newName, ttlInMilliseconds, value); + del(oldName); + + return true; + } + + return false; + } + } diff --git a/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonConnectionFactory.java b/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonConnectionFactory.java index 53c2c4e09..161ea4627 100644 --- a/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonConnectionFactory.java +++ b/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonConnectionFactory.java @@ -106,6 +106,9 @@ public class RedissonConnectionFactory implements RedisConnectionFactory, @Override public RedisConnection getConnection() { + if (redisson.getConfig().isClusterConfig()) { + return new RedissonClusterConnection(redisson); + } return new RedissonConnection(redisson); } diff --git a/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonReactiveClusterKeyCommands.java b/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonReactiveClusterKeyCommands.java index 887f5b422..001e1901e 100644 --- a/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonReactiveClusterKeyCommands.java +++ b/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonReactiveClusterKeyCommands.java @@ -17,8 +17,10 @@ package org.redisson.spring.data.connection; import java.nio.ByteBuffer; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; +import org.reactivestreams.Publisher; import org.redisson.api.RFuture; import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.codec.StringCodec; @@ -26,13 +28,18 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.connection.MasterSlaveEntry; import org.redisson.reactive.CommandReactiveExecutor; import org.springframework.data.redis.connection.ReactiveClusterKeyCommands; +import org.springframework.data.redis.connection.ReactiveRedisConnection; +import org.springframework.data.redis.connection.ReactiveRedisConnection.BooleanResponse; import org.springframework.data.redis.connection.RedisClusterNode; +import org.springframework.util.Assert; import io.netty.util.CharsetUtil; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; /** - * + * * @author Nikita Koksharov * */ @@ -59,4 +66,71 @@ public class RedissonReactiveClusterKeyCommands extends RedissonReactiveKeyComma return m.map(v -> ByteBuffer.wrap(v)); } + @Override + public Flux> rename(Publisher commands) { + + return execute(commands, command -> { + Assert.notNull(command.getKey(), "Key must not be null!"); + Assert.notNull(command.getNewName(), "New name must not be null!"); + + byte[] keyBuf = toByteArray(command.getKey()); + byte[] newKeyBuf = toByteArray(command.getNewName()); + + if (executorService.getConnectionManager().calcSlot(keyBuf) == executorService.getConnectionManager().calcSlot(newKeyBuf)) { + return super.rename(commands); + } + + return read(keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.DUMP, keyBuf) + .filter(Objects::nonNull) + .zipWith( + Mono.defer(() -> pTtl(command.getKey()) + .filter(Objects::nonNull) + .map(ttl -> Math.max(0, ttl)) + .switchIfEmpty(Mono.just(0L)) + ) + ) + .flatMap(valueAndTtl -> { + return write(newKeyBuf, StringCodec.INSTANCE, RedisCommands.RESTORE, newKeyBuf, valueAndTtl.getT2(), valueAndTtl.getT1()); + }) + .thenReturn(new BooleanResponse<>(command, true)) + .doOnSuccess((ignored) -> del(command.getKey())); + }); + } + + @Override + public Flux> renameNX(Publisher commands) { + return execute(commands, command -> { + Assert.notNull(command.getKey(), "Key must not be null!"); + Assert.notNull(command.getNewName(), "New name must not be null!"); + + byte[] keyBuf = toByteArray(command.getKey()); + byte[] newKeyBuf = toByteArray(command.getNewName()); + + if (executorService.getConnectionManager().calcSlot(keyBuf) == executorService.getConnectionManager().calcSlot(newKeyBuf)) { + return super.renameNX(commands); + } + + return exists(command.getNewName()) + .zipWith(read(keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.DUMP, keyBuf)) + .filter(newKeyExistsAndDump -> !newKeyExistsAndDump.getT1() && Objects.nonNull(newKeyExistsAndDump.getT2())) + .map(Tuple2::getT2) + .zipWhen(value -> + pTtl(command.getKey()) + .filter(Objects::nonNull) + .map(ttl -> Math.max(0, ttl)) + .switchIfEmpty(Mono.just(0L)) + + ) + .flatMap(valueAndTtl -> write(newKeyBuf, StringCodec.INSTANCE, RedisCommands.RESTORE, newKeyBuf, valueAndTtl.getT2(), valueAndTtl.getT1()) + .then(Mono.just(true))) + .switchIfEmpty(Mono.just(false)) + .doOnSuccess(didRename -> { + if (didRename) { + del(command.getKey()); + } + }) + .map(didRename -> new BooleanResponse<>(command, didRename)); + }); + } + } diff --git a/redisson-spring-data/redisson-spring-data-22/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionRenameTest.java b/redisson-spring-data/redisson-spring-data-22/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionRenameTest.java new file mode 100644 index 000000000..ed8a07bb7 --- /dev/null +++ b/redisson-spring-data/redisson-spring-data-22/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionRenameTest.java @@ -0,0 +1,166 @@ +package org.redisson.spring.data.connection; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.redisson.ClusterRunner; +import org.redisson.ClusterRunner.ClusterProcesses; +import org.redisson.RedisRunner; +import org.redisson.RedisRunner.FailedToStartRedisException; +import org.redisson.Redisson; +import org.redisson.api.RedissonClient; +import org.redisson.config.Config; +import org.redisson.config.SubscriptionMode; +import org.redisson.connection.balancer.RandomLoadBalancer; +import org.springframework.dao.InvalidDataAccessResourceUsageException; + +import java.io.IOException; +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.*; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT; + +@RunWith(Parameterized.class) +public class RedissonClusterConnectionRenameTest { + + @Parameterized.Parameters(name= "{index} - same slot = {0}") + public static Iterable data() { + return Arrays.asList(new Object[][] { + {false}, + {true} + }); + } + + @Parameterized.Parameter(0) + public boolean sameSlot; + + static RedissonClient redisson; + static RedissonClusterConnection connection; + static ClusterProcesses process; + + byte[] originalKey = "key".getBytes(); + byte[] newKey = "unset".getBytes(); + byte[] value = "value".getBytes(); + + @BeforeClass + public static void before() throws FailedToStartRedisException, IOException, InterruptedException { + RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave(); + + + ClusterRunner clusterRunner = new ClusterRunner() + .addNode(master1, slave1) + .addNode(master2, slave2) + .addNode(master3, slave3); + process = clusterRunner.run(); + + Config config = new Config(); + config.useClusterServers() + .setSubscriptionMode(SubscriptionMode.SLAVE) + .setLoadBalancer(new RandomLoadBalancer()) + .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); + + redisson = Redisson.create(config); + connection = new RedissonClusterConnection(redisson); + } + + @AfterClass + public static void after() { + process.shutdown(); + redisson.shutdown(); + } + + @After + public void cleanup() { + connection.del(originalKey); + connection.del(newKey); + } + + @Test + public void testRename() { + connection.set(originalKey, value); + connection.expire(originalKey, 1000); + + Integer originalSlot = connection.clusterGetSlotForKey(originalKey); + newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot)); + + connection.rename(originalKey, newKey); + + assertThat(connection.get(newKey)).isEqualTo(value); + assertThat(connection.ttl(newKey)).isGreaterThan(0); + } + + @Test + public void testRename_pipeline() { + connection.set(originalKey, value); + + Integer originalSlot = connection.clusterGetSlotForKey(originalKey); + newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot)); + + connection.openPipeline(); + assertThatThrownBy(() -> connection.rename(originalKey, newKey)).isInstanceOf(InvalidDataAccessResourceUsageException.class); + connection.closePipeline(); + } + + protected byte[] getNewKeyForSlot(byte[] originalKey, Integer targetSlot) { + int counter = 0; + + byte[] newKey = (new String(originalKey) + counter).getBytes(); + + Integer newKeySlot = connection.clusterGetSlotForKey(newKey); + + while(!newKeySlot.equals(targetSlot)) { + counter++; + newKey = (new String(originalKey) + counter).getBytes(); + newKeySlot = connection.clusterGetSlotForKey(newKey); + } + + return newKey; + } + + @Test + public void testRenameNX() { + connection.set(originalKey, value); + connection.expire(originalKey, 1000); + + Integer originalSlot = connection.clusterGetSlotForKey(originalKey); + newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot)); + + Boolean result = connection.renameNX(originalKey, newKey); + + assertThat(connection.get(newKey)).isEqualTo(value); + assertThat(connection.ttl(newKey)).isGreaterThan(0); + assertThat(result).isTrue(); + + connection.set(originalKey, value); + + result = connection.renameNX(originalKey, newKey); + + assertThat(result).isFalse(); + } + + @Test + public void testRenameNX_pipeline() { + connection.set(originalKey, value); + + Integer originalSlot = connection.clusterGetSlotForKey(originalKey); + newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot)); + + connection.openPipeline(); + assertThatThrownBy(() -> connection.renameNX(originalKey, newKey)).isInstanceOf(InvalidDataAccessResourceUsageException.class); + connection.closePipeline(); + } + + private Integer getTargetSlot(Integer originalSlot) { + return sameSlot ? originalSlot : MAX_SLOT - originalSlot - 1; + } + +} diff --git a/redisson-spring-data/redisson-spring-data-22/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java b/redisson-spring-data/redisson-spring-data-22/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java index ca6ddb40b..a47b7f183 100644 --- a/redisson-spring-data/redisson-spring-data-22/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java +++ b/redisson-spring-data/redisson-spring-data-22/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java @@ -1,32 +1,34 @@ package org.redisson.spring.data.connection; -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import static org.assertj.core.api.Assertions.*; -import org.junit.After; import org.junit.AfterClass; -import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.redisson.ClusterRunner; +import org.redisson.ClusterRunner.ClusterProcesses; import org.redisson.RedisRunner; import org.redisson.RedisRunner.FailedToStartRedisException; import org.redisson.Redisson; -import org.redisson.ClusterRunner.ClusterProcesses; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.redisson.config.SubscriptionMode; import org.redisson.connection.MasterSlaveConnectionManager; import org.redisson.connection.balancer.RandomLoadBalancer; +import org.springframework.dao.InvalidDataAccessResourceUsageException; import org.springframework.data.redis.connection.ClusterInfo; import org.springframework.data.redis.connection.RedisClusterNode; +import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.RedisNode.NodeType; import org.springframework.data.redis.core.types.RedisClientInfo; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.assertj.core.api.Assertions.*; +import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT; + public class RedissonClusterConnectionTest { static RedissonClient redisson; @@ -218,5 +220,12 @@ public class RedissonClusterConnectionTest { RedisClusterNode master = map.keySet().iterator().next(); return master; } - + + @Test + public void testConnectionFactoryReturnsClusterConnection() { + RedisConnectionFactory connectionFactory = new RedissonConnectionFactory(redisson); + + assertThat(connectionFactory.getConnection()).isInstanceOf(RedissonClusterConnection.class); + } + } diff --git a/redisson-spring-data/redisson-spring-data-22/src/test/java/org/redisson/spring/data/connection/RedissonReactiveClusterKeyCommandsTest.java b/redisson-spring-data/redisson-spring-data-22/src/test/java/org/redisson/spring/data/connection/RedissonReactiveClusterKeyCommandsTest.java new file mode 100644 index 000000000..1f5126810 --- /dev/null +++ b/redisson-spring-data/redisson-spring-data-22/src/test/java/org/redisson/spring/data/connection/RedissonReactiveClusterKeyCommandsTest.java @@ -0,0 +1,195 @@ +package org.redisson.spring.data.connection; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.redisson.ClusterRunner; +import org.redisson.ClusterRunner.ClusterProcesses; +import org.redisson.RedisRunner; +import org.redisson.RedisRunner.FailedToStartRedisException; +import org.redisson.Redisson; +import org.redisson.RedissonKeys; +import org.redisson.api.RedissonClient; +import org.redisson.client.codec.StringCodec; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.config.Config; +import org.redisson.config.SubscriptionMode; +import org.redisson.connection.balancer.RandomLoadBalancer; +import org.redisson.reactive.CommandReactiveService; +import org.springframework.data.redis.RedisSystemException; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT; + +@RunWith(Parameterized.class) +public class RedissonReactiveClusterKeyCommandsTest { + + @Parameterized.Parameters(name= "{index} - same slot = {0}; has ttl = {1}") + public static Iterable data() { + return Arrays.asList(new Object[][] { + {false, false}, + {true, false}, + {false, true}, + {true, true} + }); + } + + @Parameterized.Parameter(0) + public boolean sameSlot; + + @Parameterized.Parameter(1) + public boolean hasTtl; + + static RedissonClient redisson; + static RedissonReactiveRedisClusterConnection connection; + static ClusterProcesses process; + + ByteBuffer originalKey = ByteBuffer.wrap("key".getBytes()); + ByteBuffer newKey = ByteBuffer.wrap("unset".getBytes()); + ByteBuffer value = ByteBuffer.wrap("value".getBytes()); + + @BeforeClass + public static void before() throws FailedToStartRedisException, IOException, InterruptedException { + RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave(); + + + ClusterRunner clusterRunner = new ClusterRunner() + .addNode(master1, slave1) + .addNode(master2, slave2) + .addNode(master3, slave3); + process = clusterRunner.run(); + + Config config = new Config(); + config.useClusterServers() + .setSubscriptionMode(SubscriptionMode.SLAVE) + .setLoadBalancer(new RandomLoadBalancer()) + .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); + + redisson = Redisson.create(config); + connection = new RedissonReactiveRedisClusterConnection(new CommandReactiveService(((RedissonKeys) redisson.getKeys()).getConnectionManager())); + } + + @AfterClass + public static void after() { + process.shutdown(); + redisson.shutdown(); + } + + @After + public void cleanup() { + connection.keyCommands().del(originalKey) + .and(connection.keyCommands().del(newKey)) + .block(); + } + + @Test + public void testRename() { + connection.stringCommands().set(originalKey, value).block(); + + if (hasTtl) { + connection.keyCommands().expire(originalKey, Duration.ofSeconds(1000)).block(); + } + + Integer originalSlot = getSlotForKey(originalKey); + newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot)); + + Boolean response = connection.keyCommands().rename(originalKey, newKey).block(); + + assertThat(response).isTrue(); + + final ByteBuffer newKeyValue = connection.stringCommands().get(newKey).block(); + assertThat(newKeyValue).isEqualTo(value); + if (hasTtl) { + assertThat(connection.keyCommands().ttl(newKey).block()).isGreaterThan(0); + } else { + assertThat(connection.keyCommands().ttl(newKey).block()).isEqualTo(-1); + } + } + + @Test + public void testRename_keyNotExist() { + Integer originalSlot = getSlotForKey(originalKey); + newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot)); + + if (sameSlot) { + // This is a quirk of the implementation - since same-slot renames use the non-cluster version, + // the result is a Redis error. This behavior matches other spring-data-redis implementations + assertThatThrownBy(() -> connection.keyCommands().rename(originalKey, newKey).block()) + .isInstanceOf(RedisSystemException.class); + + } else { + Boolean response = connection.keyCommands().rename(originalKey, newKey).block(); + + assertThat(response).isTrue(); + + final ByteBuffer newKeyValue = connection.stringCommands().get(newKey).block(); + assertThat(newKeyValue).isEqualTo(null); + } + } + + protected ByteBuffer getNewKeyForSlot(String originalKey, Integer targetSlot) { + int counter = 0; + + ByteBuffer newKey = ByteBuffer.wrap((originalKey + counter).getBytes()); + + Integer newKeySlot = getSlotForKey(newKey); + + while(!newKeySlot.equals(targetSlot)) { + counter++; + newKey = ByteBuffer.wrap((originalKey + counter).getBytes()); + newKeySlot = getSlotForKey(newKey); + } + + return newKey; + } + + @Test + public void testRenameNX() { + connection.stringCommands().set(originalKey, value).block(); + if (hasTtl) { + connection.keyCommands().expire(originalKey, Duration.ofSeconds(1000)).block(); + } + + Integer originalSlot = getSlotForKey(originalKey); + newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot)); + + Boolean result = connection.keyCommands().renameNX(originalKey, newKey).block(); + + assertThat(result).isTrue(); + assertThat(connection.stringCommands().get(newKey).block()).isEqualTo(value); + if (hasTtl) { + assertThat(connection.keyCommands().ttl(newKey).block()).isGreaterThan(0); + } else { + assertThat(connection.keyCommands().ttl(newKey).block()).isEqualTo(-1); + } + + connection.stringCommands().set(originalKey, value).block(); + + result = connection.keyCommands().renameNX(originalKey, newKey).block(); + + assertThat(result).isFalse(); + } + + private Integer getTargetSlot(Integer originalSlot) { + return sameSlot ? originalSlot : MAX_SLOT - originalSlot - 1; + } + + private Integer getSlotForKey(ByteBuffer key) { + return (Integer) connection.read(null, StringCodec.INSTANCE, RedisCommands.KEYSLOT, key.array()).block(); + } + +} diff --git a/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java b/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java index 1b3a4ee02..c01302a12 100644 --- a/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java +++ b/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java @@ -40,6 +40,7 @@ import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.client.protocol.decoder.StringMapDataDecoder; import org.redisson.connection.MasterSlaveEntry; +import org.springframework.dao.InvalidDataAccessResourceUsageException; import org.springframework.data.redis.connection.ClusterInfo; import org.springframework.data.redis.connection.DefaultedRedisClusterConnection; import org.springframework.data.redis.connection.RedisClusterNode; @@ -430,4 +431,66 @@ public class RedissonClusterConnection extends RedissonConnection implements Def }.open(); } + @Override + public void rename(byte[] oldName, byte[] newName) { + + if (isPipelined()) { + throw new InvalidDataAccessResourceUsageException("Clustered rename is not supported in a pipeline"); + } + + if (redisson.getConnectionManager().calcSlot(oldName) == redisson.getConnectionManager().calcSlot(newName)) { + super.rename(oldName, newName); + return; + } + + final byte[] value = dump(oldName); + + if (null != value) { + + final Long sourceTtlInSeconds = ttl(oldName); + + final long ttlInMilliseconds; + if (null != sourceTtlInSeconds && sourceTtlInSeconds > 0) { + ttlInMilliseconds = sourceTtlInSeconds * 1000; + } else { + ttlInMilliseconds = 0; + } + + restore(newName, ttlInMilliseconds, value); + del(oldName); + } + } + + @Override + public Boolean renameNX(byte[] oldName, byte[] newName) { + if (isPipelined()) { + throw new InvalidDataAccessResourceUsageException("Clustered rename is not supported in a pipeline"); + } + + if (redisson.getConnectionManager().calcSlot(oldName) == redisson.getConnectionManager().calcSlot(newName)) { + return super.renameNX(oldName, newName); + } + + final byte[] value = dump(oldName); + + if (null != value && !exists(newName)) { + + final Long sourceTtlInSeconds = ttl(oldName); + + final long ttlInMilliseconds; + if (null != sourceTtlInSeconds && sourceTtlInSeconds > 0) { + ttlInMilliseconds = sourceTtlInSeconds * 1000; + } else { + ttlInMilliseconds = 0; + } + + restore(newName, ttlInMilliseconds, value); + del(oldName); + + return true; + } + + return false; + } + } diff --git a/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonConnectionFactory.java b/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonConnectionFactory.java index 53c2c4e09..161ea4627 100644 --- a/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonConnectionFactory.java +++ b/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonConnectionFactory.java @@ -106,6 +106,9 @@ public class RedissonConnectionFactory implements RedisConnectionFactory, @Override public RedisConnection getConnection() { + if (redisson.getConfig().isClusterConfig()) { + return new RedissonClusterConnection(redisson); + } return new RedissonConnection(redisson); } diff --git a/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonReactiveClusterKeyCommands.java b/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonReactiveClusterKeyCommands.java index 887f5b422..001e1901e 100644 --- a/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonReactiveClusterKeyCommands.java +++ b/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonReactiveClusterKeyCommands.java @@ -17,8 +17,10 @@ package org.redisson.spring.data.connection; import java.nio.ByteBuffer; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; +import org.reactivestreams.Publisher; import org.redisson.api.RFuture; import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.codec.StringCodec; @@ -26,13 +28,18 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.connection.MasterSlaveEntry; import org.redisson.reactive.CommandReactiveExecutor; import org.springframework.data.redis.connection.ReactiveClusterKeyCommands; +import org.springframework.data.redis.connection.ReactiveRedisConnection; +import org.springframework.data.redis.connection.ReactiveRedisConnection.BooleanResponse; import org.springframework.data.redis.connection.RedisClusterNode; +import org.springframework.util.Assert; import io.netty.util.CharsetUtil; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; /** - * + * * @author Nikita Koksharov * */ @@ -59,4 +66,71 @@ public class RedissonReactiveClusterKeyCommands extends RedissonReactiveKeyComma return m.map(v -> ByteBuffer.wrap(v)); } + @Override + public Flux> rename(Publisher commands) { + + return execute(commands, command -> { + Assert.notNull(command.getKey(), "Key must not be null!"); + Assert.notNull(command.getNewName(), "New name must not be null!"); + + byte[] keyBuf = toByteArray(command.getKey()); + byte[] newKeyBuf = toByteArray(command.getNewName()); + + if (executorService.getConnectionManager().calcSlot(keyBuf) == executorService.getConnectionManager().calcSlot(newKeyBuf)) { + return super.rename(commands); + } + + return read(keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.DUMP, keyBuf) + .filter(Objects::nonNull) + .zipWith( + Mono.defer(() -> pTtl(command.getKey()) + .filter(Objects::nonNull) + .map(ttl -> Math.max(0, ttl)) + .switchIfEmpty(Mono.just(0L)) + ) + ) + .flatMap(valueAndTtl -> { + return write(newKeyBuf, StringCodec.INSTANCE, RedisCommands.RESTORE, newKeyBuf, valueAndTtl.getT2(), valueAndTtl.getT1()); + }) + .thenReturn(new BooleanResponse<>(command, true)) + .doOnSuccess((ignored) -> del(command.getKey())); + }); + } + + @Override + public Flux> renameNX(Publisher commands) { + return execute(commands, command -> { + Assert.notNull(command.getKey(), "Key must not be null!"); + Assert.notNull(command.getNewName(), "New name must not be null!"); + + byte[] keyBuf = toByteArray(command.getKey()); + byte[] newKeyBuf = toByteArray(command.getNewName()); + + if (executorService.getConnectionManager().calcSlot(keyBuf) == executorService.getConnectionManager().calcSlot(newKeyBuf)) { + return super.renameNX(commands); + } + + return exists(command.getNewName()) + .zipWith(read(keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.DUMP, keyBuf)) + .filter(newKeyExistsAndDump -> !newKeyExistsAndDump.getT1() && Objects.nonNull(newKeyExistsAndDump.getT2())) + .map(Tuple2::getT2) + .zipWhen(value -> + pTtl(command.getKey()) + .filter(Objects::nonNull) + .map(ttl -> Math.max(0, ttl)) + .switchIfEmpty(Mono.just(0L)) + + ) + .flatMap(valueAndTtl -> write(newKeyBuf, StringCodec.INSTANCE, RedisCommands.RESTORE, newKeyBuf, valueAndTtl.getT2(), valueAndTtl.getT1()) + .then(Mono.just(true))) + .switchIfEmpty(Mono.just(false)) + .doOnSuccess(didRename -> { + if (didRename) { + del(command.getKey()); + } + }) + .map(didRename -> new BooleanResponse<>(command, didRename)); + }); + } + } diff --git a/redisson-spring-data/redisson-spring-data-23/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionRenameTest.java b/redisson-spring-data/redisson-spring-data-23/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionRenameTest.java new file mode 100644 index 000000000..ed8a07bb7 --- /dev/null +++ b/redisson-spring-data/redisson-spring-data-23/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionRenameTest.java @@ -0,0 +1,166 @@ +package org.redisson.spring.data.connection; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.redisson.ClusterRunner; +import org.redisson.ClusterRunner.ClusterProcesses; +import org.redisson.RedisRunner; +import org.redisson.RedisRunner.FailedToStartRedisException; +import org.redisson.Redisson; +import org.redisson.api.RedissonClient; +import org.redisson.config.Config; +import org.redisson.config.SubscriptionMode; +import org.redisson.connection.balancer.RandomLoadBalancer; +import org.springframework.dao.InvalidDataAccessResourceUsageException; + +import java.io.IOException; +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.*; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT; + +@RunWith(Parameterized.class) +public class RedissonClusterConnectionRenameTest { + + @Parameterized.Parameters(name= "{index} - same slot = {0}") + public static Iterable data() { + return Arrays.asList(new Object[][] { + {false}, + {true} + }); + } + + @Parameterized.Parameter(0) + public boolean sameSlot; + + static RedissonClient redisson; + static RedissonClusterConnection connection; + static ClusterProcesses process; + + byte[] originalKey = "key".getBytes(); + byte[] newKey = "unset".getBytes(); + byte[] value = "value".getBytes(); + + @BeforeClass + public static void before() throws FailedToStartRedisException, IOException, InterruptedException { + RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave(); + + + ClusterRunner clusterRunner = new ClusterRunner() + .addNode(master1, slave1) + .addNode(master2, slave2) + .addNode(master3, slave3); + process = clusterRunner.run(); + + Config config = new Config(); + config.useClusterServers() + .setSubscriptionMode(SubscriptionMode.SLAVE) + .setLoadBalancer(new RandomLoadBalancer()) + .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); + + redisson = Redisson.create(config); + connection = new RedissonClusterConnection(redisson); + } + + @AfterClass + public static void after() { + process.shutdown(); + redisson.shutdown(); + } + + @After + public void cleanup() { + connection.del(originalKey); + connection.del(newKey); + } + + @Test + public void testRename() { + connection.set(originalKey, value); + connection.expire(originalKey, 1000); + + Integer originalSlot = connection.clusterGetSlotForKey(originalKey); + newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot)); + + connection.rename(originalKey, newKey); + + assertThat(connection.get(newKey)).isEqualTo(value); + assertThat(connection.ttl(newKey)).isGreaterThan(0); + } + + @Test + public void testRename_pipeline() { + connection.set(originalKey, value); + + Integer originalSlot = connection.clusterGetSlotForKey(originalKey); + newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot)); + + connection.openPipeline(); + assertThatThrownBy(() -> connection.rename(originalKey, newKey)).isInstanceOf(InvalidDataAccessResourceUsageException.class); + connection.closePipeline(); + } + + protected byte[] getNewKeyForSlot(byte[] originalKey, Integer targetSlot) { + int counter = 0; + + byte[] newKey = (new String(originalKey) + counter).getBytes(); + + Integer newKeySlot = connection.clusterGetSlotForKey(newKey); + + while(!newKeySlot.equals(targetSlot)) { + counter++; + newKey = (new String(originalKey) + counter).getBytes(); + newKeySlot = connection.clusterGetSlotForKey(newKey); + } + + return newKey; + } + + @Test + public void testRenameNX() { + connection.set(originalKey, value); + connection.expire(originalKey, 1000); + + Integer originalSlot = connection.clusterGetSlotForKey(originalKey); + newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot)); + + Boolean result = connection.renameNX(originalKey, newKey); + + assertThat(connection.get(newKey)).isEqualTo(value); + assertThat(connection.ttl(newKey)).isGreaterThan(0); + assertThat(result).isTrue(); + + connection.set(originalKey, value); + + result = connection.renameNX(originalKey, newKey); + + assertThat(result).isFalse(); + } + + @Test + public void testRenameNX_pipeline() { + connection.set(originalKey, value); + + Integer originalSlot = connection.clusterGetSlotForKey(originalKey); + newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot)); + + connection.openPipeline(); + assertThatThrownBy(() -> connection.renameNX(originalKey, newKey)).isInstanceOf(InvalidDataAccessResourceUsageException.class); + connection.closePipeline(); + } + + private Integer getTargetSlot(Integer originalSlot) { + return sameSlot ? originalSlot : MAX_SLOT - originalSlot - 1; + } + +} diff --git a/redisson-spring-data/redisson-spring-data-23/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java b/redisson-spring-data/redisson-spring-data-23/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java index ca6ddb40b..a47b7f183 100644 --- a/redisson-spring-data/redisson-spring-data-23/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java +++ b/redisson-spring-data/redisson-spring-data-23/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java @@ -1,32 +1,34 @@ package org.redisson.spring.data.connection; -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import static org.assertj.core.api.Assertions.*; -import org.junit.After; import org.junit.AfterClass; -import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.redisson.ClusterRunner; +import org.redisson.ClusterRunner.ClusterProcesses; import org.redisson.RedisRunner; import org.redisson.RedisRunner.FailedToStartRedisException; import org.redisson.Redisson; -import org.redisson.ClusterRunner.ClusterProcesses; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.redisson.config.SubscriptionMode; import org.redisson.connection.MasterSlaveConnectionManager; import org.redisson.connection.balancer.RandomLoadBalancer; +import org.springframework.dao.InvalidDataAccessResourceUsageException; import org.springframework.data.redis.connection.ClusterInfo; import org.springframework.data.redis.connection.RedisClusterNode; +import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.RedisNode.NodeType; import org.springframework.data.redis.core.types.RedisClientInfo; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.assertj.core.api.Assertions.*; +import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT; + public class RedissonClusterConnectionTest { static RedissonClient redisson; @@ -218,5 +220,12 @@ public class RedissonClusterConnectionTest { RedisClusterNode master = map.keySet().iterator().next(); return master; } - + + @Test + public void testConnectionFactoryReturnsClusterConnection() { + RedisConnectionFactory connectionFactory = new RedissonConnectionFactory(redisson); + + assertThat(connectionFactory.getConnection()).isInstanceOf(RedissonClusterConnection.class); + } + } diff --git a/redisson-spring-data/redisson-spring-data-23/src/test/java/org/redisson/spring/data/connection/RedissonReactiveClusterKeyCommandsTest.java b/redisson-spring-data/redisson-spring-data-23/src/test/java/org/redisson/spring/data/connection/RedissonReactiveClusterKeyCommandsTest.java new file mode 100644 index 000000000..c2388f3af --- /dev/null +++ b/redisson-spring-data/redisson-spring-data-23/src/test/java/org/redisson/spring/data/connection/RedissonReactiveClusterKeyCommandsTest.java @@ -0,0 +1,196 @@ +package org.redisson.spring.data.connection; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.redisson.ClusterRunner; +import org.redisson.ClusterRunner.ClusterProcesses; +import org.redisson.RedisRunner; +import org.redisson.RedisRunner.FailedToStartRedisException; +import org.redisson.Redisson; +import org.redisson.RedissonKeys; +import org.redisson.api.RedissonClient; +import org.redisson.client.codec.StringCodec; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.config.Config; +import org.redisson.config.SubscriptionMode; +import org.redisson.connection.balancer.RandomLoadBalancer; +import org.redisson.reactive.CommandReactiveService; +import org.springframework.data.redis.RedisSystemException; + + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT; + +@RunWith(Parameterized.class) +public class RedissonReactiveClusterKeyCommandsTest { + + @Parameterized.Parameters(name= "{index} - same slot = {0}; has ttl = {1}") + public static Iterable data() { + return Arrays.asList(new Object[][] { + {false, false}, + {true, false}, + {false, true}, + {true, true} + }); + } + + @Parameterized.Parameter(0) + public boolean sameSlot; + + @Parameterized.Parameter(1) + public boolean hasTtl; + + static RedissonClient redisson; + static RedissonReactiveRedisClusterConnection connection; + static ClusterProcesses process; + + ByteBuffer originalKey = ByteBuffer.wrap("key".getBytes()); + ByteBuffer newKey = ByteBuffer.wrap("unset".getBytes()); + ByteBuffer value = ByteBuffer.wrap("value".getBytes()); + + @BeforeClass + public static void before() throws FailedToStartRedisException, IOException, InterruptedException { + RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave(); + + + ClusterRunner clusterRunner = new ClusterRunner() + .addNode(master1, slave1) + .addNode(master2, slave2) + .addNode(master3, slave3); + process = clusterRunner.run(); + + Config config = new Config(); + config.useClusterServers() + .setSubscriptionMode(SubscriptionMode.SLAVE) + .setLoadBalancer(new RandomLoadBalancer()) + .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); + + redisson = Redisson.create(config); + connection = new RedissonReactiveRedisClusterConnection(new CommandReactiveService(((RedissonKeys) redisson.getKeys()).getConnectionManager())); + } + + @AfterClass + public static void after() { + process.shutdown(); + redisson.shutdown(); + } + + @After + public void cleanup() { + connection.keyCommands().del(originalKey) + .and(connection.keyCommands().del(newKey)) + .block(); + } + + @Test + public void testRename() { + connection.stringCommands().set(originalKey, value).block(); + + if (hasTtl) { + connection.keyCommands().expire(originalKey, Duration.ofSeconds(1000)).block(); + } + + Integer originalSlot = getSlotForKey(originalKey); + newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot)); + + Boolean response = connection.keyCommands().rename(originalKey, newKey).block(); + + assertThat(response).isTrue(); + + final ByteBuffer newKeyValue = connection.stringCommands().get(newKey).block(); + assertThat(newKeyValue).isEqualTo(value); + if (hasTtl) { + assertThat(connection.keyCommands().ttl(newKey).block()).isGreaterThan(0); + } else { + assertThat(connection.keyCommands().ttl(newKey).block()).isEqualTo(-1); + } + } + + @Test + public void testRename_keyNotExist() { + Integer originalSlot = getSlotForKey(originalKey); + newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot)); + + if (sameSlot) { + // This is a quirk of the implementation - since same-slot renames use the non-cluster version, + // the result is a Redis error. This behavior matches other spring-data-redis implementations + assertThatThrownBy(() -> connection.keyCommands().rename(originalKey, newKey).block()) + .isInstanceOf(RedisSystemException.class); + + } else { + Boolean response = connection.keyCommands().rename(originalKey, newKey).block(); + + assertThat(response).isTrue(); + + final ByteBuffer newKeyValue = connection.stringCommands().get(newKey).block(); + assertThat(newKeyValue).isEqualTo(null); + } + } + + protected ByteBuffer getNewKeyForSlot(String originalKey, Integer targetSlot) { + int counter = 0; + + ByteBuffer newKey = ByteBuffer.wrap((originalKey + counter).getBytes()); + + Integer newKeySlot = getSlotForKey(newKey); + + while(!newKeySlot.equals(targetSlot)) { + counter++; + newKey = ByteBuffer.wrap((originalKey + counter).getBytes()); + newKeySlot = getSlotForKey(newKey); + } + + return newKey; + } + + @Test + public void testRenameNX() { + connection.stringCommands().set(originalKey, value).block(); + if (hasTtl) { + connection.keyCommands().expire(originalKey, Duration.ofSeconds(1000)).block(); + } + + Integer originalSlot = getSlotForKey(originalKey); + newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot)); + + Boolean result = connection.keyCommands().renameNX(originalKey, newKey).block(); + + assertThat(result).isTrue(); + assertThat(connection.stringCommands().get(newKey).block()).isEqualTo(value); + if (hasTtl) { + assertThat(connection.keyCommands().ttl(newKey).block()).isGreaterThan(0); + } else { + assertThat(connection.keyCommands().ttl(newKey).block()).isEqualTo(-1); + } + + connection.stringCommands().set(originalKey, value).block(); + + result = connection.keyCommands().renameNX(originalKey, newKey).block(); + + assertThat(result).isFalse(); + } + + private Integer getTargetSlot(Integer originalSlot) { + return sameSlot ? originalSlot : MAX_SLOT - originalSlot - 1; + } + + private Integer getSlotForKey(ByteBuffer key) { + return (Integer) connection.read(null, StringCodec.INSTANCE, RedisCommands.KEYSLOT, key.array()).block(); + } + +}