Fixed - Spring Data Redis RedissonConnection.del() method doesn't work in pipeline on Redis cluster. #3237

pull/3248/head
Nikita Koksharov 4 years ago
parent 4cdfd46042
commit 683b0dfce2

@ -507,6 +507,14 @@ public class RedissonClusterConnection extends RedissonConnection implements Red
@Override
public Long del(byte[]... keys) {
if (isQueueing() || isPipelined()) {
for (byte[] key: keys) {
write(key, LongCodec.INSTANCE, RedisCommands.DEL, key);
}
return null;
}
RFuture<Long> f = executeAsync(RedisCommands.DEL, keys);
return sync(f);
}

@ -506,6 +506,14 @@ public class RedissonClusterConnection extends RedissonConnection implements Red
@Override
public Long del(byte[]... keys) {
if (isQueueing() || isPipelined()) {
for (byte[] key: keys) {
write(key, LongCodec.INSTANCE, RedisCommands.DEL, key);
}
return null;
}
RFuture<Long> f = executeAsync(RedisCommands.DEL, keys);
return sync(f);
}

@ -511,6 +511,14 @@ public class RedissonClusterConnection extends RedissonConnection implements Red
@Override
public Long del(byte[]... keys) {
if (isQueueing() || isPipelined()) {
for (byte[] key: keys) {
write(key, LongCodec.INSTANCE, RedisCommands.DEL, key);
}
return null;
}
RFuture<Long> f = executeAsync(RedisCommands.DEL, keys);
return sync(f);
}

@ -560,6 +560,14 @@ public class RedissonClusterConnection extends RedissonConnection implements Def
@Override
public Long del(byte[]... keys) {
if (isQueueing() || isPipelined()) {
for (byte[] key: keys) {
write(key, LongCodec.INSTANCE, RedisCommands.DEL, key);
}
return null;
}
RFuture<Long> f = executeAsync(RedisCommands.DEL, keys);
return sync(f);
}

@ -560,6 +560,14 @@ public class RedissonClusterConnection extends RedissonConnection implements Def
@Override
public Long del(byte[]... keys) {
if (isQueueing() || isPipelined()) {
for (byte[] key: keys) {
write(key, LongCodec.INSTANCE, RedisCommands.DEL, key);
}
return null;
}
RFuture<Long> f = executeAsync(RedisCommands.DEL, keys);
return sync(f);
}

@ -560,6 +560,14 @@ public class RedissonClusterConnection extends RedissonConnection implements Def
@Override
public Long del(byte[]... keys) {
if (isQueueing() || isPipelined()) {
for (byte[] key: keys) {
write(key, LongCodec.INSTANCE, RedisCommands.DEL, key);
}
return null;
}
RFuture<Long> f = executeAsync(RedisCommands.DEL, keys);
return sync(f);
}

@ -560,6 +560,14 @@ public class RedissonClusterConnection extends RedissonConnection implements Def
@Override
public Long del(byte[]... keys) {
if (isQueueing() || isPipelined()) {
for (byte[] key: keys) {
write(key, LongCodec.INSTANCE, RedisCommands.DEL, key);
}
return null;
}
RFuture<Long> f = executeAsync(RedisCommands.DEL, keys);
return sync(f);
}

@ -21,10 +21,7 @@ import org.springframework.data.redis.connection.RedisNode.NodeType;
import org.springframework.data.redis.core.types.RedisClientInfo;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.*;
import static org.assertj.core.api.Assertions.*;
import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT;
@ -34,7 +31,7 @@ public class RedissonClusterConnectionTest {
static RedissonClient redisson;
static RedissonClusterConnection connection;
static ClusterProcesses process;
@BeforeClass
public static void before() throws FailedToStartRedisException, IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
@ -44,29 +41,29 @@ public class RedissonClusterConnectionTest {
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setSubscriptionMode(SubscriptionMode.SLAVE)
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
redisson = Redisson.create(config);
connection = new RedissonClusterConnection(redisson);
}
@AfterClass
public static void after() {
process.shutdown();
redisson.shutdown();
}
@Test
public void testClusterGetNodes() {
Iterable<RedisClusterNode> nodes = connection.clusterGetNodes();
@ -105,26 +102,26 @@ public class RedissonClusterConnectionTest {
assertThat(slaves).hasSize(1);
}
}
@Test
public void testClusterGetSlotForKey() {
Integer slot = connection.clusterGetSlotForKey("123".getBytes());
assertThat(slot).isNotNull();
}
@Test
public void testClusterGetNodeForSlot() {
RedisClusterNode node1 = connection.clusterGetNodeForSlot(1);
RedisClusterNode node2 = connection.clusterGetNodeForSlot(16000);
assertThat(node1.getId()).isNotEqualTo(node2.getId());
}
@Test
public void testClusterGetNodeForKey() {
RedisClusterNode node = connection.clusterGetNodeForKey("123".getBytes());
assertThat(node).isNotNull();
}
@Test
public void testClusterGetClusterInfo() {
ClusterInfo info = connection.clusterGetClusterInfo();
@ -132,7 +129,7 @@ public class RedissonClusterConnectionTest {
assertThat(info.getSlotsOk()).isEqualTo(MasterSlaveConnectionManager.MAX_SLOT);
assertThat(info.getSlotsAssigned()).isEqualTo(MasterSlaveConnectionManager.MAX_SLOT);
}
@Test
public void testClusterAddRemoveSlots() {
RedisClusterNode master = getFirstMaster();
@ -140,7 +137,7 @@ public class RedissonClusterConnectionTest {
connection.clusterDeleteSlots(master, slot);
connection.clusterAddSlots(master, slot);
}
@Test
public void testClusterCountKeysInSlot() {
Long t = connection.clusterCountKeysInSlot(1);
@ -153,7 +150,7 @@ public class RedissonClusterConnectionTest {
connection.clusterForget(master);
connection.clusterMeet(master);
}
@Test
public void testClusterGetKeysInSlot() {
List<byte[]> keys = connection.clusterGetKeysInSlot(12, 10);
@ -181,33 +178,59 @@ public class RedissonClusterConnectionTest {
assertThat(info.size()).isGreaterThan(10);
}
@Test
public void testDelPipeline() {
byte[] k = "key".getBytes();
byte[] v = "val".getBytes();
connection.set(k, v);
connection.openPipeline();
connection.get(k);
connection.del(k);
List<Object> results = connection.closePipeline();
byte[] val = (byte[])results.get(0);
assertThat(val).isEqualTo(v);
Long res = (Long) results.get(1);
assertThat(res).isEqualTo(1);
}
@Test
public void testDel() {
List<byte[]> keys = new ArrayList<>();
for (int i = 0; i < 10; i++) {
byte[] key = ("test" + i).getBytes();
keys.add(key);
connection.set(key, ("test" + i).getBytes());
}
connection.del(keys.toArray(new byte[0][]));
}
@Test
public void testResetConfigStats() {
RedisClusterNode master = getFirstMaster();
connection.resetConfigStats(master);
}
@Test
public void testTime() {
RedisClusterNode master = getFirstMaster();
Long time = connection.time(master);
assertThat(time).isGreaterThan(1000);
}
@Test
public void testGetClientList() {
RedisClusterNode master = getFirstMaster();
List<RedisClientInfo> list = connection.getClientList(master);
assertThat(list.size()).isGreaterThan(10);
}
@Test
public void testSetConfig() {
RedisClusterNode master = getFirstMaster();
connection.setConfig(master, "timeout", "10");
}
@Test
public void testGetConfig() {
RedisClusterNode master = getFirstMaster();

Loading…
Cancel
Save