Fixed - ReactiveKeyCommands.pExpire() throws an exception. #5640

pull/5644/head
Nikita Koksharov 12 months ago
parent d4074e8afb
commit 6278653d05

@ -66,6 +66,12 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>

@ -15,11 +15,6 @@
*/
package org.redisson.spring.data.connection;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.List;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.StringCodec;
@ -30,16 +25,15 @@ import org.redisson.client.protocol.convertor.Convertor;
import org.redisson.reactive.CommandReactiveExecutor;
import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.connection.ReactiveKeyCommands;
import org.springframework.data.redis.connection.ReactiveRedisConnection.BooleanResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand;
import org.springframework.data.redis.connection.ReactiveRedisConnection.MultiValueResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.*;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.stream.Collectors;
/**
*
* @author Nikita Koksharov
@ -180,7 +174,7 @@ public class RedissonReactiveKeyCommands extends RedissonBaseReactive implements
Assert.notNull(command.getKey(), "Key must not be null!");
byte[] keyBuf = toByteArray(command.getKey());
Mono<Boolean> m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf);
Mono<Boolean> m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf, command.getTimeout().toMillis());
return m.map(v -> new BooleanResponse<>(command, v));
});
}

@ -15,12 +15,6 @@
*/
package org.redisson.spring.data.connection;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.StringCodec;
@ -32,18 +26,19 @@ import org.redisson.reactive.CommandReactiveExecutor;
import org.redisson.reactive.RedissonKeysReactive;
import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.connection.ReactiveKeyCommands;
import org.springframework.data.redis.connection.ReactiveRedisConnection.BooleanResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand;
import org.springframework.data.redis.connection.ReactiveRedisConnection.MultiValueResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.*;
import org.springframework.data.redis.connection.ValueEncoding;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
/**
*
* @author Nikita Koksharov
@ -228,7 +223,7 @@ public class RedissonReactiveKeyCommands extends RedissonBaseReactive implements
Assert.notNull(command.getKey(), "Key must not be null!");
byte[] keyBuf = toByteArray(command.getKey());
Mono<Boolean> m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf);
Mono<Boolean> m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf, command.getTimeout().toMillis());
return m.map(v -> new BooleanResponse<>(command, v));
});
}

@ -15,12 +15,6 @@
*/
package org.redisson.spring.data.connection;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.StringCodec;
@ -32,18 +26,19 @@ import org.redisson.reactive.CommandReactiveExecutor;
import org.redisson.reactive.RedissonKeysReactive;
import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.connection.ReactiveKeyCommands;
import org.springframework.data.redis.connection.ReactiveRedisConnection.BooleanResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand;
import org.springframework.data.redis.connection.ReactiveRedisConnection.MultiValueResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.*;
import org.springframework.data.redis.connection.ValueEncoding;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
/**
*
* @author Nikita Koksharov
@ -228,7 +223,7 @@ public class RedissonReactiveKeyCommands extends RedissonBaseReactive implements
Assert.notNull(command.getKey(), "Key must not be null!");
byte[] keyBuf = toByteArray(command.getKey());
Mono<Boolean> m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf);
Mono<Boolean> m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf, command.getTimeout().toMillis());
return m.map(v -> new BooleanResponse<>(command, v));
});
}

@ -15,12 +15,6 @@
*/
package org.redisson.spring.data.connection;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.StringCodec;
@ -32,18 +26,19 @@ import org.redisson.reactive.CommandReactiveExecutor;
import org.redisson.reactive.RedissonKeysReactive;
import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.connection.ReactiveKeyCommands;
import org.springframework.data.redis.connection.ReactiveRedisConnection.BooleanResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand;
import org.springframework.data.redis.connection.ReactiveRedisConnection.MultiValueResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.*;
import org.springframework.data.redis.connection.ValueEncoding;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
/**
*
* @author Nikita Koksharov
@ -228,7 +223,7 @@ public class RedissonReactiveKeyCommands extends RedissonBaseReactive implements
Assert.notNull(command.getKey(), "Key must not be null!");
byte[] keyBuf = toByteArray(command.getKey());
Mono<Boolean> m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf);
Mono<Boolean> m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf, command.getTimeout().toMillis());
return m.map(v -> new BooleanResponse<>(command, v));
});
}

@ -15,12 +15,6 @@
*/
package org.redisson.spring.data.connection;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.StringCodec;
@ -32,18 +26,19 @@ import org.redisson.reactive.CommandReactiveExecutor;
import org.redisson.reactive.RedissonKeysReactive;
import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.connection.ReactiveKeyCommands;
import org.springframework.data.redis.connection.ReactiveRedisConnection.BooleanResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand;
import org.springframework.data.redis.connection.ReactiveRedisConnection.MultiValueResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.*;
import org.springframework.data.redis.connection.ValueEncoding;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
/**
*
* @author Nikita Koksharov
@ -228,7 +223,7 @@ public class RedissonReactiveKeyCommands extends RedissonBaseReactive implements
Assert.notNull(command.getKey(), "Key must not be null!");
byte[] keyBuf = toByteArray(command.getKey());
Mono<Boolean> m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf);
Mono<Boolean> m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf, command.getTimeout().toMillis());
return m.map(v -> new BooleanResponse<>(command, v));
});
}

@ -15,12 +15,6 @@
*/
package org.redisson.spring.data.connection;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.StringCodec;
@ -32,18 +26,19 @@ import org.redisson.reactive.CommandReactiveExecutor;
import org.redisson.reactive.RedissonKeysReactive;
import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.connection.ReactiveKeyCommands;
import org.springframework.data.redis.connection.ReactiveRedisConnection.BooleanResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand;
import org.springframework.data.redis.connection.ReactiveRedisConnection.MultiValueResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.*;
import org.springframework.data.redis.connection.ValueEncoding;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
/**
*
* @author Nikita Koksharov
@ -228,7 +223,7 @@ public class RedissonReactiveKeyCommands extends RedissonBaseReactive implements
Assert.notNull(command.getKey(), "Key must not be null!");
byte[] keyBuf = toByteArray(command.getKey());
Mono<Boolean> m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf);
Mono<Boolean> m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf, command.getTimeout().toMillis());
return m.map(v -> new BooleanResponse<>(command, v));
});
}

@ -15,13 +15,6 @@
*/
package org.redisson.spring.data.connection;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.StringCodec;
@ -33,18 +26,20 @@ import org.redisson.reactive.CommandReactiveExecutor;
import org.redisson.reactive.RedissonKeysReactive;
import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.connection.ReactiveKeyCommands;
import org.springframework.data.redis.connection.ReactiveRedisConnection.BooleanResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand;
import org.springframework.data.redis.connection.ReactiveRedisConnection.MultiValueResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.*;
import org.springframework.data.redis.connection.ValueEncoding;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
/**
*
* @author Nikita Koksharov
@ -229,7 +224,7 @@ public class RedissonReactiveKeyCommands extends RedissonBaseReactive implements
Assert.notNull(command.getKey(), "Key must not be null!");
byte[] keyBuf = toByteArray(command.getKey());
Mono<Boolean> m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf);
Mono<Boolean> m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf, command.getTimeout().toMillis());
return m.map(v -> new BooleanResponse<>(command, v));
});
}

@ -15,13 +15,6 @@
*/
package org.redisson.spring.data.connection;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.StringCodec;
@ -33,18 +26,20 @@ import org.redisson.reactive.CommandReactiveExecutor;
import org.redisson.reactive.RedissonKeysReactive;
import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.connection.ReactiveKeyCommands;
import org.springframework.data.redis.connection.ReactiveRedisConnection.BooleanResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand;
import org.springframework.data.redis.connection.ReactiveRedisConnection.MultiValueResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.*;
import org.springframework.data.redis.connection.ValueEncoding;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
/**
*
* @author Nikita Koksharov
@ -229,7 +224,7 @@ public class RedissonReactiveKeyCommands extends RedissonBaseReactive implements
Assert.notNull(command.getKey(), "Key must not be null!");
byte[] keyBuf = toByteArray(command.getKey());
Mono<Boolean> m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf);
Mono<Boolean> m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf, command.getTimeout().toMillis());
return m.map(v -> new BooleanResponse<>(command, v));
});
}

@ -15,13 +15,6 @@
*/
package org.redisson.spring.data.connection;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.StringCodec;
@ -33,18 +26,20 @@ import org.redisson.reactive.CommandReactiveExecutor;
import org.redisson.reactive.RedissonKeysReactive;
import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.connection.ReactiveKeyCommands;
import org.springframework.data.redis.connection.ReactiveRedisConnection.BooleanResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand;
import org.springframework.data.redis.connection.ReactiveRedisConnection.MultiValueResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.*;
import org.springframework.data.redis.connection.ValueEncoding;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
/**
*
* @author Nikita Koksharov
@ -229,7 +224,7 @@ public class RedissonReactiveKeyCommands extends RedissonBaseReactive implements
Assert.notNull(command.getKey(), "Key must not be null!");
byte[] keyBuf = toByteArray(command.getKey());
Mono<Boolean> m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf);
Mono<Boolean> m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf, command.getTimeout().toMillis());
return m.map(v -> new BooleanResponse<>(command, v));
});
}

@ -15,13 +15,6 @@
*/
package org.redisson.spring.data.connection;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.StringCodec;
@ -33,18 +26,20 @@ import org.redisson.reactive.CommandReactiveExecutor;
import org.redisson.reactive.RedissonKeysReactive;
import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.connection.ReactiveKeyCommands;
import org.springframework.data.redis.connection.ReactiveRedisConnection.BooleanResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand;
import org.springframework.data.redis.connection.ReactiveRedisConnection.MultiValueResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.*;
import org.springframework.data.redis.connection.ValueEncoding;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
/**
*
* @author Nikita Koksharov
@ -229,7 +224,7 @@ public class RedissonReactiveKeyCommands extends RedissonBaseReactive implements
Assert.notNull(command.getKey(), "Key must not be null!");
byte[] keyBuf = toByteArray(command.getKey());
Mono<Boolean> m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf);
Mono<Boolean> m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf, command.getTimeout().toMillis());
return m.map(v -> new BooleanResponse<>(command, v));
});
}

@ -15,15 +15,11 @@
*/
package org.redisson.spring.data.connection;
import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.connection.ServiceManager;
import org.redisson.misc.RedisURI;
import org.springframework.data.redis.connection.RedisClusterNode;
import org.springframework.data.redis.connection.RedisClusterNode.Flag;
@ -32,8 +28,8 @@ import org.springframework.data.redis.connection.RedisClusterNode.RedisClusterNo
import org.springframework.data.redis.connection.RedisClusterNode.SlotRange;
import org.springframework.data.redis.connection.RedisNode.NodeType;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
import java.io.IOException;
import java.util.*;
/**
*
@ -42,6 +38,12 @@ import io.netty.util.CharsetUtil;
*/
public class RedisClusterNodeDecoder implements Decoder<List<RedisClusterNode>> {
private final ServiceManager serviceManager;
public RedisClusterNodeDecoder(ServiceManager serviceManager) {
this.serviceManager = serviceManager;
}
@Override
public List<RedisClusterNode> decode(ByteBuf buf, State state) throws IOException {
String response = buf.toString(CharsetUtil.UTF_8);
@ -63,7 +65,14 @@ public class RedisClusterNodeDecoder implements Decoder<List<RedisClusterNode>>
RedisURI address = null;
if (!flags.contains(Flag.NOADDR)) {
String addr = params[1].split("@")[0];
String name = addr.substring(0, addr.lastIndexOf(":"));
if (name.isEmpty()) {
// skip nodes with empty address
continue;
}
address = new RedisURI("redis://" + addr);
address = serviceManager.toURI("redis", address.getHost(), String.valueOf(address.getPort()));
}
String masterId = params[3];

@ -15,7 +15,6 @@
*/
package org.redisson.spring.data.connection;
import io.netty.util.CharsetUtil;
import org.redisson.api.BatchResult;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
@ -57,16 +56,16 @@ import java.util.stream.Collectors;
*/
public class RedissonClusterConnection extends RedissonConnection implements RedisClusterConnection, DefaultedRedisClusterConnection {
private static final RedisStrictCommand<List<RedisClusterNode>> CLUSTER_NODES =
new RedisStrictCommand<List<RedisClusterNode>>("CLUSTER", "NODES", new ObjectDecoder(new RedisClusterNodeDecoder()));
public RedissonClusterConnection(RedissonClient redisson) {
super(redisson);
}
@Override
public Iterable<RedisClusterNode> clusterGetNodes() {
return read(null, StringCodec.INSTANCE, CLUSTER_NODES);
RedisStrictCommand<List<RedisClusterNode>> cluster
= new RedisStrictCommand<List<RedisClusterNode>>("CLUSTER", "NODES",
new ObjectDecoder(new RedisClusterNodeDecoder(executorService.getServiceManager())));
return read(null, StringCodec.INSTANCE, cluster);
}
@Override
@ -100,7 +99,7 @@ public class RedissonClusterConnection extends RedissonConnection implements Red
public Map<RedisClusterNode, Collection<RedisClusterNode>> clusterGetMasterReplicaMap() {
Iterable<RedisClusterNode> res = clusterGetNodes();
Set<RedisClusterNode> masters = new HashSet<RedisClusterNode>();
Set<RedisClusterNode> masters = new HashSet<>();
for (Iterator<RedisClusterNode> iterator = res.iterator(); iterator.hasNext();) {
RedisClusterNode redisClusterNode = iterator.next();
if (redisClusterNode.isMaster()) {

@ -15,13 +15,6 @@
*/
package org.redisson.spring.data.connection;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.StringCodec;
@ -33,18 +26,20 @@ import org.redisson.reactive.CommandReactiveExecutor;
import org.redisson.reactive.RedissonKeysReactive;
import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.connection.ReactiveKeyCommands;
import org.springframework.data.redis.connection.ReactiveRedisConnection.BooleanResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand;
import org.springframework.data.redis.connection.ReactiveRedisConnection.MultiValueResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.*;
import org.springframework.data.redis.connection.ValueEncoding;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
/**
*
* @author Nikita Koksharov
@ -227,9 +222,10 @@ public class RedissonReactiveKeyCommands extends RedissonBaseReactive implements
return execute(commands, command -> {
Assert.notNull(command.getKey(), "Key must not be null!");
Assert.notNull(command.getTimeout(), "Timeout must not be null!");
byte[] keyBuf = toByteArray(command.getKey());
Mono<Boolean> m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf);
Mono<Boolean> m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.PEXPIRE, keyBuf, command.getTimeout().toMillis());
return m.map(v -> new BooleanResponse<>(command, v));
});
}

@ -102,12 +102,13 @@ public class RedissonReactiveRedisClusterConnection extends RedissonReactiveRedi
return execute(node, RedisCommands.PING);
}
private static final RedisStrictCommand<List<RedisClusterNode>> CLUSTER_NODES =
new RedisStrictCommand<>("CLUSTER", "NODES", new ObjectDecoder(new RedisClusterNodeDecoder()));
@Override
public Flux<RedisClusterNode> clusterGetNodes() {
Mono<List<RedisClusterNode>> result = read(null, StringCodec.INSTANCE, CLUSTER_NODES);
RedisStrictCommand<List<RedisClusterNode>> cluster
= new RedisStrictCommand<List<RedisClusterNode>>("CLUSTER", "NODES",
new ObjectDecoder(new RedisClusterNodeDecoder(executorService.getServiceManager())));
Mono<List<RedisClusterNode>> result = read(null, StringCodec.INSTANCE, cluster);
return result.flatMapMany(e -> Flux.fromIterable(e));
}

@ -1,86 +1,140 @@
package org.redisson;
import java.io.IOException;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.redisson.api.NatMapper;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.Protocol;
import org.redisson.misc.RedisURI;
import org.redisson.spring.data.connection.RedissonClusterConnection;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;
public abstract class BaseTest {
protected RedissonClient redisson;
protected static RedissonClient defaultRedisson;
@BeforeClass
public static void beforeClass() throws IOException, InterruptedException {
if (!RedissonRuntimeEnvironment.isTravis) {
RedisRunner.startDefaultRedisServerInstance();
defaultRedisson = createInstance();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
defaultRedisson.shutdown();
try {
RedisRunner.shutDownDefaultRedisServerInstance();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
protected static final String NOTIFY_KEYSPACE_EVENTS = "--notify-keyspace-events";
private static final GenericContainer<?> REDIS = createRedis();
protected static final Protocol protocol = Protocol.RESP2;
protected static RedissonClient redisson;
protected static RedissonClient redissonCluster;
private static GenericContainer<?> REDIS_CLUSTER;
protected static GenericContainer<?> createRedis() {
return createRedis("7.2");
}
@Before
public void before() throws IOException, InterruptedException {
if (RedissonRuntimeEnvironment.isTravis) {
RedisRunner.startDefaultRedisServerInstance();
redisson = createInstance();
} else {
if (redisson == null) {
redisson = defaultRedisson;
}
if (flushBetweenTests()) {
redisson.getKeys().flushall();
}
}
protected static GenericContainer<?> createRedis(String version) {
return new GenericContainer<>("redis:" + version)
.withCreateContainerCmdModifier(cmd -> {
cmd.withCmd("redis-server", "--save", "''");
})
.withExposedPorts(6379);
}
@After
public void after() throws InterruptedException {
if (RedissonRuntimeEnvironment.isTravis) {
redisson.shutdown();
RedisRunner.shutDownDefaultRedisServerInstance();
@BeforeClass
public static void beforeAll() {
if (redisson == null) {
REDIS.start();
Config config = createConfig();
redisson = Redisson.create(config);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
redisson.shutdown();
REDIS.stop();
if (redissonCluster != null) {
redissonCluster.shutdown();
redissonCluster = null;
}
if (REDIS_CLUSTER != null) {
REDIS_CLUSTER.stop();
REDIS_CLUSTER = null;
}
}));
}
}
public static Config createConfig() {
// String redisAddress = System.getProperty("redisAddress");
// if (redisAddress == null) {
// redisAddress = "127.0.0.1:6379";
// }
protected static Config createConfig() {
Config config = new Config();
// config.setCodec(new MsgPackJacksonCodec());
// config.useSentinelServers().setMasterName("mymaster").addSentinelAddress("127.0.0.1:26379", "127.0.0.1:26389");
// config.useClusterServers().addNodeAddress("127.0.0.1:7004", "127.0.0.1:7001", "127.0.0.1:7000");
config.setProtocol(protocol);
config.useSingleServer()
.setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort());
// .setPassword("mypass1");
// config.useMasterSlaveConnection()
// .setMasterAddress("127.0.0.1:6379")
// .addSlaveAddress("127.0.0.1:6399")
// .addSlaveAddress("127.0.0.1:6389");
.setAddress("redis://127.0.0.1:" + REDIS.getFirstMappedPort());
return config;
}
public static RedissonClient createInstance() {
protected static RedissonClient createInstance() {
Config config = createConfig();
return Redisson.create(config);
}
protected boolean flushBetweenTests() {
return true;
protected void testWithParams(Consumer<RedissonClient> redissonCallback, String... params) {
GenericContainer<?> redis =
new GenericContainer<>("redis:7.2")
.withCreateContainerCmdModifier(cmd -> {
List<String> args = new ArrayList<>();
args.add("redis-server");
args.addAll(Arrays.asList(params));
cmd.withCmd(args);
})
.withExposedPorts(6379);
redis.start();
Config config = new Config();
config.setProtocol(protocol);
config.useSingleServer().setAddress("redis://127.0.0.1:" + redis.getFirstMappedPort());
RedissonClient redisson = Redisson.create(config);
try {
redissonCallback.accept(redisson);
} finally {
redisson.shutdown();
redis.stop();
}
}
protected void testInCluster(Consumer<RedissonClusterConnection> redissonCallback) {
if (redissonCluster == null) {
REDIS_CLUSTER = new GenericContainer<>("vishnunair/docker-redis-cluster")
.withExposedPorts(6379, 6380, 6381, 6382, 6383, 6384)
.withStartupCheckStrategy(new MinimumDurationRunningStartupCheckStrategy(Duration.ofSeconds(15)));
REDIS_CLUSTER.start();
Config config = new Config();
config.setProtocol(protocol);
config.useClusterServers()
.setNatMapper(new NatMapper() {
@Override
public RedisURI map(RedisURI uri) {
if (REDIS_CLUSTER.getMappedPort(uri.getPort()) == null) {
return uri;
}
return new RedisURI(uri.getScheme(), REDIS_CLUSTER.getHost(), REDIS_CLUSTER.getMappedPort(uri.getPort()));
}
})
.addNodeAddress("redis://127.0.0.1:" + REDIS_CLUSTER.getFirstMappedPort());
redissonCluster = Redisson.create(config);
}
redissonCallback.accept(new RedissonClusterConnection(redissonCluster));
}
@Before
public void beforeEach() {
redisson.getKeys().flushall();
if (redissonCluster != null) {
redissonCluster.getKeys().flushall();
}
}
}

@ -1,31 +1,19 @@
package org.redisson.spring.data.connection;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.redisson.ClusterRunner;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SubscriptionMode;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.redisson.BaseTest;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import java.io.IOException;
import java.util.Arrays;
import static org.assertj.core.api.Assertions.*;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT;
@RunWith(Parameterized.class)
public class RedissonClusterConnectionRenameTest {
public class RedissonClusterConnectionRenameTest extends BaseTest {
@Parameterized.Parameters(name= "{index} - same slot = {0}")
public static Iterable<Object[]> data() {
@ -38,79 +26,41 @@ public class RedissonClusterConnectionRenameTest {
@Parameterized.Parameter(0)
public boolean sameSlot;
static RedissonClient redisson;
static RedissonClusterConnection connection;
static ClusterProcesses process;
byte[] originalKey = "key".getBytes();
byte[] newKey = "unset".getBytes();
byte[] value = "value".getBytes();
@BeforeClass
public static void before() throws FailedToStartRedisException, IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setSubscriptionMode(SubscriptionMode.SLAVE)
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
redisson = Redisson.create(config);
connection = new RedissonClusterConnection(redisson);
}
@AfterClass
public static void after() {
process.shutdown();
redisson.shutdown();
}
@After
public void cleanup() {
connection.del(originalKey);
connection.del(newKey);
}
@Test
public void testRename() {
connection.set(originalKey, value);
connection.expire(originalKey, 1000);
testInCluster(connection -> {
connection.set(originalKey, value);
connection.expire(originalKey, 1000);
Integer originalSlot = connection.clusterGetSlotForKey(originalKey);
newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot));
Integer originalSlot = connection.clusterGetSlotForKey(originalKey);
newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot), connection);
connection.rename(originalKey, newKey);
connection.rename(originalKey, newKey);
assertThat(connection.get(newKey)).isEqualTo(value);
assertThat(connection.ttl(newKey)).isGreaterThan(0);
assertThat(connection.get(newKey)).isEqualTo(value);
assertThat(connection.ttl(newKey)).isGreaterThan(0);
});
}
@Test
public void testRename_pipeline() {
connection.set(originalKey, value);
testInCluster(connection -> {
connection.set(originalKey, value);
Integer originalSlot = connection.clusterGetSlotForKey(originalKey);
newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot));
Integer originalSlot = connection.clusterGetSlotForKey(originalKey);
newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot), connection);
connection.openPipeline();
assertThatThrownBy(() -> connection.rename(originalKey, newKey)).isInstanceOf(InvalidDataAccessResourceUsageException.class);
connection.closePipeline();
connection.openPipeline();
assertThatThrownBy(() -> connection.rename(originalKey, newKey)).isInstanceOf(InvalidDataAccessResourceUsageException.class);
connection.closePipeline();
});
}
protected byte[] getNewKeyForSlot(byte[] originalKey, Integer targetSlot) {
protected byte[] getNewKeyForSlot(byte[] originalKey, Integer targetSlot, RedissonClusterConnection connection) {
int counter = 0;
byte[] newKey = (new String(originalKey) + counter).getBytes();
@ -128,35 +78,39 @@ public class RedissonClusterConnectionRenameTest {
@Test
public void testRenameNX() {
connection.set(originalKey, value);
connection.expire(originalKey, 1000);
testInCluster(connection -> {
connection.set(originalKey, value);
connection.expire(originalKey, 1000);
Integer originalSlot = connection.clusterGetSlotForKey(originalKey);
newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot));
Integer originalSlot = connection.clusterGetSlotForKey(originalKey);
newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot), connection);
Boolean result = connection.renameNX(originalKey, newKey);
Boolean result = connection.renameNX(originalKey, newKey);
assertThat(connection.get(newKey)).isEqualTo(value);
assertThat(connection.ttl(newKey)).isGreaterThan(0);
assertThat(result).isTrue();
assertThat(connection.get(newKey)).isEqualTo(value);
assertThat(connection.ttl(newKey)).isGreaterThan(0);
assertThat(result).isTrue();
connection.set(originalKey, value);
connection.set(originalKey, value);
result = connection.renameNX(originalKey, newKey);
result = connection.renameNX(originalKey, newKey);
assertThat(result).isFalse();
assertThat(result).isFalse();
});
}
@Test
public void testRenameNX_pipeline() {
connection.set(originalKey, value);
testInCluster(connection -> {
connection.set(originalKey, value);
Integer originalSlot = connection.clusterGetSlotForKey(originalKey);
newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot));
Integer originalSlot = connection.clusterGetSlotForKey(originalKey);
newKey = getNewKeyForSlot(originalKey, getTargetSlot(originalSlot), connection);
connection.openPipeline();
assertThatThrownBy(() -> connection.renameNX(originalKey, newKey)).isInstanceOf(InvalidDataAccessResourceUsageException.class);
connection.closePipeline();
connection.openPipeline();
assertThatThrownBy(() -> connection.renameNX(originalKey, newKey)).isInstanceOf(InvalidDataAccessResourceUsageException.class);
connection.closePipeline();
});
}
private Integer getTargetSlot(Integer originalSlot) {

@ -1,20 +1,10 @@
package org.redisson.spring.data.connection;
import net.bytebuddy.utility.RandomString;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.redisson.ClusterRunner;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.Redisson;
import org.redisson.BaseTest;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.ReadMode;
import org.redisson.config.SubscriptionMode;
import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.RedisClusterNode;
import org.springframework.data.redis.connection.RedisConnectionFactory;
@ -24,288 +14,291 @@ import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.types.RedisClientInfo;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.*;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonClusterConnectionTest {
static RedissonClient redisson;
static RedissonClusterConnection connection;
static ClusterProcesses process;
@BeforeClass
public static void before() throws FailedToStartRedisException, IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setReadMode(ReadMode.MASTER_SLAVE)
.setSubscriptionMode(SubscriptionMode.SLAVE)
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
redisson = Redisson.create(config);
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
connection = (RedissonClusterConnection) factory.getConnection();
}
@AfterClass
public static void after() {
process.shutdown();
redisson.shutdown();
}
public class RedissonClusterConnectionTest extends BaseTest {
@Test
public void testRandomKey() {
StringRedisTemplate redisTemplate = new StringRedisTemplate();
redisTemplate.setConnectionFactory(new RedissonConnectionFactory(redisson));
redisTemplate.afterPropertiesSet();
for (int i = 0; i < 10; i++) {
redisTemplate.opsForValue().set("i" + i, "i" + i);
}
for (RedisClusterNode clusterNode : redisTemplate.getConnectionFactory().getClusterConnection().clusterGetNodes()) {
String key = redisTemplate.opsForCluster().randomKey(clusterNode);
assertThat(key).isNotNull();
}
testInCluster(connection -> {
RedissonClient redisson = (RedissonClient) connection.getNativeConnection();
StringRedisTemplate redisTemplate = new StringRedisTemplate();
redisTemplate.setConnectionFactory(new RedissonConnectionFactory(redisson));
redisTemplate.afterPropertiesSet();
for (int i = 0; i < 10; i++) {
redisTemplate.opsForValue().set("i" + i, "i" + i);
}
for (RedisClusterNode clusterNode : redisTemplate.getConnectionFactory().getClusterConnection().clusterGetNodes()) {
String key = redisTemplate.opsForCluster().randomKey(clusterNode);
assertThat(key).isNotNull();
}
});
}
@Test
public void testDel() {
List<byte[]> keys = new ArrayList<>();
for (int i = 0; i < 10; i++) {
byte[] key = ("test" + i).getBytes();
keys.add(key);
connection.set(key, ("test" + i).getBytes());
}
assertThat(connection.del(keys.toArray(new byte[0][]))).isEqualTo(10);
testInCluster(connection -> {
List<byte[]> keys = new ArrayList<>();
for (int i = 0; i < 10; i++) {
byte[] key = ("test" + i).getBytes();
keys.add(key);
connection.set(key, ("test" + i).getBytes());
}
assertThat(connection.del(keys.toArray(new byte[0][]))).isEqualTo(10);
});
}
@Test
public void testScan() {
Map<byte[], byte[]> map = new HashMap<>();
for (int i = 0; i < 10000; i++) {
map.put(RandomString.make(32).getBytes(), RandomString.make(32).getBytes(StandardCharsets.UTF_8));
}
connection.mSet(map);
Cursor<byte[]> b = connection.scan(ScanOptions.scanOptions().build());
Set<String> sett = new HashSet<>();
int counter = 0;
while (b.hasNext()) {
byte[] tt = b.next();
sett.add(new String(tt));
counter++;
}
assertThat(sett.size()).isEqualTo(map.size());
assertThat(counter).isEqualTo(map.size());
testInCluster(connection -> {
Map<byte[], byte[]> map = new HashMap<>();
for (int i = 0; i < 10000; i++) {
map.put(RandomString.make(32).getBytes(), RandomString.make(32).getBytes(StandardCharsets.UTF_8));
}
connection.mSet(map);
Cursor<byte[]> b = connection.scan(ScanOptions.scanOptions().build());
Set<String> sett = new HashSet<>();
int counter = 0;
while (b.hasNext()) {
byte[] tt = b.next();
sett.add(new String(tt));
counter++;
}
assertThat(sett.size()).isEqualTo(map.size());
assertThat(counter).isEqualTo(map.size());
});
}
@Test
public void testMSet() {
Map<byte[], byte[]> map = new HashMap<>();
for (int i = 0; i < 10; i++) {
map.put(("test" + i).getBytes(), ("test" + i*100).getBytes());
}
connection.mSet(map);
for (Map.Entry<byte[], byte[]> entry : map.entrySet()) {
assertThat(connection.get(entry.getKey())).isEqualTo(entry.getValue());
}
testInCluster(connection -> {
Map<byte[], byte[]> map = new HashMap<>();
for (int i = 0; i < 10; i++) {
map.put(("test" + i).getBytes(), ("test" + i*100).getBytes());
}
connection.mSet(map);
for (Map.Entry<byte[], byte[]> entry : map.entrySet()) {
assertThat(connection.get(entry.getKey())).isEqualTo(entry.getValue());
}
});
}
@Test
public void testMGet() {
Map<byte[], byte[]> map = new HashMap<>();
for (int i = 0; i < 10; i++) {
map.put(("test" + i).getBytes(), ("test" + i*100).getBytes());
}
connection.mSet(map);
List<byte[]> r = connection.mGet(map.keySet().toArray(new byte[0][]));
assertThat(r).containsExactly(map.values().toArray(new byte[0][]));
testInCluster(connection -> {
Map<byte[], byte[]> map = new HashMap<>();
for (int i = 0; i < 10; i++) {
map.put(("test" + i).getBytes(), ("test" + i*100).getBytes());
}
connection.mSet(map);
List<byte[]> r = connection.mGet(map.keySet().toArray(new byte[0][]));
assertThat(r).containsExactly(map.values().toArray(new byte[0][]));
});
}
@Test
public void testClusterGetNodes() {
Iterable<RedisClusterNode> nodes = connection.clusterGetNodes();
assertThat(nodes).hasSize(6);
for (RedisClusterNode redisClusterNode : nodes) {
assertThat(redisClusterNode.getLinkState()).isNotNull();
assertThat(redisClusterNode.getFlags()).isNotEmpty();
assertThat(redisClusterNode.getHost()).isNotNull();
assertThat(redisClusterNode.getPort()).isNotNull();
assertThat(redisClusterNode.getId()).isNotNull();
assertThat(redisClusterNode.getType()).isNotNull();
if (redisClusterNode.getType() == NodeType.MASTER) {
assertThat(redisClusterNode.getSlotRange().getSlots()).isNotEmpty();
} else {
assertThat(redisClusterNode.getMasterId()).isNotNull();
testInCluster(connection -> {
Iterable<RedisClusterNode> nodes = connection.clusterGetNodes();
assertThat(nodes).hasSize(6);
for (RedisClusterNode redisClusterNode : nodes) {
assertThat(redisClusterNode.getLinkState()).isNotNull();
assertThat(redisClusterNode.getFlags()).isNotEmpty();
assertThat(redisClusterNode.getHost()).isNotNull();
assertThat(redisClusterNode.getPort()).isNotNull();
assertThat(redisClusterNode.getId()).isNotNull();
assertThat(redisClusterNode.getType()).isNotNull();
if (redisClusterNode.getType() == NodeType.MASTER) {
assertThat(redisClusterNode.getSlotRange().getSlots()).isNotEmpty();
} else {
assertThat(redisClusterNode.getMasterId()).isNotNull();
}
}
}
});
}
@Test
public void testClusterGetNodesMaster() {
Iterable<RedisClusterNode> nodes = connection.clusterGetNodes();
for (RedisClusterNode redisClusterNode : nodes) {
if (redisClusterNode.getType() == NodeType.MASTER) {
Collection<RedisClusterNode> slaves = connection.clusterGetReplicas(redisClusterNode);
assertThat(slaves).hasSize(1);
testInCluster(connection -> {
Iterable<RedisClusterNode> nodes = connection.clusterGetNodes();
for (RedisClusterNode redisClusterNode : nodes) {
if (redisClusterNode.getType() == NodeType.MASTER) {
Collection<RedisClusterNode> slaves = connection.clusterGetReplicas(redisClusterNode);
assertThat(slaves).hasSize(1);
}
}
}
});
}
@Test
public void testClusterGetMasterSlaveMap() {
Map<RedisClusterNode, Collection<RedisClusterNode>> map = connection.clusterGetMasterReplicaMap();
assertThat(map).hasSize(3);
for (Collection<RedisClusterNode> slaves : map.values()) {
assertThat(slaves).hasSize(1);
}
testInCluster(connection -> {
Map<RedisClusterNode, Collection<RedisClusterNode>> map = connection.clusterGetMasterReplicaMap();
assertThat(map).hasSize(3);
for (Collection<RedisClusterNode> slaves : map.values()) {
assertThat(slaves).hasSize(1);
}
});
}
@Test
public void testClusterGetSlotForKey() {
Integer slot = connection.clusterGetSlotForKey("123".getBytes());
assertThat(slot).isNotNull();
testInCluster(connection -> {
Integer slot = connection.clusterGetSlotForKey("123".getBytes());
assertThat(slot).isNotNull();
});
}
@Test
public void testClusterGetNodeForSlot() {
RedisClusterNode node1 = connection.clusterGetNodeForSlot(1);
RedisClusterNode node2 = connection.clusterGetNodeForSlot(16000);
assertThat(node1.getId()).isNotEqualTo(node2.getId());
testInCluster(connection -> {
RedisClusterNode node1 = connection.clusterGetNodeForSlot(1);
RedisClusterNode node2 = connection.clusterGetNodeForSlot(16000);
assertThat(node1.getId()).isNotEqualTo(node2.getId());
});
}
@Test
public void testClusterGetNodeForKey() {
RedisClusterNode node = connection.clusterGetNodeForKey("123".getBytes());
assertThat(node).isNotNull();
testInCluster(connection -> {
RedisClusterNode node = connection.clusterGetNodeForKey("123".getBytes());
assertThat(node).isNotNull();
});
}
@Test
public void testClusterGetClusterInfo() {
ClusterInfo info = connection.clusterGetClusterInfo();
assertThat(info.getSlotsFail()).isEqualTo(0);
assertThat(info.getSlotsOk()).isEqualTo(MasterSlaveConnectionManager.MAX_SLOT);
assertThat(info.getSlotsAssigned()).isEqualTo(MasterSlaveConnectionManager.MAX_SLOT);
testInCluster(connection -> {
ClusterInfo info = connection.clusterGetClusterInfo();
assertThat(info.getSlotsFail()).isEqualTo(0);
assertThat(info.getSlotsOk()).isEqualTo(MasterSlaveConnectionManager.MAX_SLOT);
assertThat(info.getSlotsAssigned()).isEqualTo(MasterSlaveConnectionManager.MAX_SLOT);
});
}
@Test
public void testClusterAddRemoveSlots() {
RedisClusterNode master = getFirstMaster();
Integer slot = master.getSlotRange().getSlots().iterator().next();
connection.clusterDeleteSlots(master, slot);
connection.clusterAddSlots(master, slot);
testInCluster(connection -> {
RedisClusterNode master = getFirstMaster(connection);
Integer slot = master.getSlotRange().getSlots().iterator().next();
connection.clusterDeleteSlots(master, slot);
connection.clusterAddSlots(master, slot);
});
}
@Test
public void testClusterCountKeysInSlot() {
Long t = connection.clusterCountKeysInSlot(1);
assertThat(t).isZero();
}
@Test
public void testClusterMeetForget() {
RedisClusterNode master = getFirstMaster();
connection.clusterForget(master);
connection.clusterMeet(master);
testInCluster(connection -> {
Long t = connection.clusterCountKeysInSlot(1);
assertThat(t).isZero();
});
}
@Test
public void testClusterGetKeysInSlot() {
connection.flushAll();
List<byte[]> keys = connection.clusterGetKeysInSlot(12, 10);
assertThat(keys).isEmpty();
testInCluster(connection -> {
connection.flushAll();
List<byte[]> keys = connection.clusterGetKeysInSlot(12, 10);
assertThat(keys).isEmpty();
});
}
@Test
public void testClusterPing() {
RedisClusterNode master = getFirstMaster();
String res = connection.ping(master);
assertThat(res).isEqualTo("PONG");
testInCluster(connection -> {
RedisClusterNode master = getFirstMaster(connection);
String res = connection.ping(master);
assertThat(res).isEqualTo("PONG");
});
}
@Test
public void testDbSize() {
connection.flushAll();
RedisClusterNode master = getFirstMaster();
Long size = connection.dbSize(master);
assertThat(size).isZero();
testInCluster(connection -> {
connection.flushAll();
RedisClusterNode master = getFirstMaster(connection);
Long size = connection.dbSize(master);
assertThat(size).isZero();
});
}
@Test
public void testInfo() {
RedisClusterNode master = getFirstMaster();
Properties info = connection.info(master);
assertThat(info.size()).isGreaterThan(10);
testInCluster(connection -> {
RedisClusterNode master = getFirstMaster(connection);
Properties info = connection.info(master);
assertThat(info.size()).isGreaterThan(10);
});
}
@Test
public void testDelPipeline() {
byte[] k = "key".getBytes();
byte[] v = "val".getBytes();
connection.set(k, v);
connection.openPipeline();
connection.get(k);
connection.del(k);
List<Object> results = connection.closePipeline();
byte[] val = (byte[])results.get(0);
assertThat(val).isEqualTo(v);
Long res = (Long) results.get(1);
assertThat(res).isEqualTo(1);
testInCluster(connection -> {
byte[] k = "key".getBytes();
byte[] v = "val".getBytes();
connection.set(k, v);
connection.openPipeline();
connection.get(k);
connection.del(k);
List<Object> results = connection.closePipeline();
byte[] val = (byte[])results.get(0);
assertThat(val).isEqualTo(v);
Long res = (Long) results.get(1);
assertThat(res).isEqualTo(1);
});
}
@Test
public void testResetConfigStats() {
RedisClusterNode master = getFirstMaster();
connection.resetConfigStats(master);
testInCluster(connection -> {
RedisClusterNode master = getFirstMaster(connection);
connection.resetConfigStats(master);
});
}
@Test
public void testTime() {
RedisClusterNode master = getFirstMaster();
Long time = connection.time(master);
assertThat(time).isGreaterThan(1000);
testInCluster(connection -> {
RedisClusterNode master = getFirstMaster(connection);
Long time = connection.time(master);
assertThat(time).isGreaterThan(1000);
});
}
@Test
public void testGetClientList() {
RedisClusterNode master = getFirstMaster();
List<RedisClientInfo> list = connection.getClientList(master);
assertThat(list.size()).isGreaterThan(10);
testInCluster(connection -> {
RedisClusterNode master = getFirstMaster(connection);
List<RedisClientInfo> list = connection.getClientList(master);
assertThat(list.size()).isGreaterThan(10);
});
}
@Test
public void testSetConfig() {
RedisClusterNode master = getFirstMaster();
connection.setConfig(master, "timeout", "10");
testInCluster(connection -> {
RedisClusterNode master = getFirstMaster(connection);
connection.setConfig(master, "timeout", "10");
});
}
@Test
public void testGetConfig() {
RedisClusterNode master = getFirstMaster();
Properties config = connection.getConfig(master, "*");
assertThat(config.size()).isGreaterThan(20);
testInCluster(connection -> {
RedisClusterNode master = getFirstMaster(connection);
Properties config = connection.getConfig(master, "*");
assertThat(config.size()).isGreaterThan(20);
});
}
protected RedisClusterNode getFirstMaster() {
protected RedisClusterNode getFirstMaster(RedissonClusterConnection connection) {
Map<RedisClusterNode, Collection<RedisClusterNode>> map = connection.clusterGetMasterReplicaMap();
RedisClusterNode master = map.keySet().iterator().next();
return master;
@ -313,9 +306,12 @@ public class RedissonClusterConnectionTest {
@Test
public void testConnectionFactoryReturnsClusterConnection() {
RedisConnectionFactory connectionFactory = new RedissonConnectionFactory(redisson);
testInCluster(connection -> {
RedissonClient redisson = (RedissonClient) connection.getNativeConnection();
RedisConnectionFactory connectionFactory = new RedissonConnectionFactory(redisson);
assertThat(connectionFactory.getConnection()).isInstanceOf(RedissonClusterConnection.class);
assertThat(connectionFactory.getConnection()).isInstanceOf(RedissonClusterConnection.class);
});
}
}

@ -17,19 +17,21 @@ import org.redisson.config.SubscriptionMode;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.redisson.reactive.CommandReactiveService;
import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.connection.ReactiveRedisClusterConnection;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
import java.util.function.Consumer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT;
@RunWith(Parameterized.class)
public class RedissonReactiveClusterKeyCommandsTest {
public class RedissonReactiveClusterKeyCommandsTest extends BaseTest {
@Parameterized.Parameters(name= "{index} - same slot = {0}; has ttl = {1}")
public static Iterable<Object[]> data() {
@ -47,109 +49,78 @@ public class RedissonReactiveClusterKeyCommandsTest {
@Parameterized.Parameter(1)
public boolean hasTtl;
static RedissonClient redisson;
static RedissonReactiveRedisClusterConnection connection;
static ClusterProcesses process;
ByteBuffer originalKey = ByteBuffer.wrap("key".getBytes());
ByteBuffer newKey = ByteBuffer.wrap("unset".getBytes());
ByteBuffer value = ByteBuffer.wrap("value".getBytes());
@BeforeClass
public static void before() throws FailedToStartRedisException, IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setSubscriptionMode(SubscriptionMode.SLAVE)
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
redisson = Redisson.create(config);
connection = new RedissonReactiveRedisClusterConnection(((RedissonReactive)redisson.reactive()).getCommandExecutor());
}
@AfterClass
public static void after() {
process.shutdown();
redisson.shutdown();
}
@After
public void cleanup() {
connection.keyCommands().del(originalKey)
.and(connection.keyCommands().del(newKey))
.block();
private void testInClusterReactive(Consumer<ReactiveRedisClusterConnection> redissonCallback) {
testInCluster(c -> {
RedissonClient redisson = (RedissonClient) c.getNativeConnection();
RedissonReactiveRedisClusterConnection connection = new RedissonReactiveRedisClusterConnection(((RedissonReactive) redisson.reactive()).getCommandExecutor());
redissonCallback.accept(connection);
});
}
@Test
public void testRename() {
connection.stringCommands().set(originalKey, value).block();
testInClusterReactive(connection -> {
connection.stringCommands().set(originalKey, value).block();
if (hasTtl) {
connection.keyCommands().expire(originalKey, Duration.ofSeconds(1000)).block();
}
if (hasTtl) {
connection.keyCommands().expire(originalKey, Duration.ofSeconds(1000)).block();
}
Integer originalSlot = getSlotForKey(originalKey);
newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot));
Integer originalSlot = getSlotForKey(originalKey, (RedissonReactiveRedisClusterConnection) connection);
newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot), connection);
Boolean response = connection.keyCommands().rename(originalKey, newKey).block();
Boolean response = connection.keyCommands().rename(originalKey, newKey).block();
assertThat(response).isTrue();
assertThat(response).isTrue();
final ByteBuffer newKeyValue = connection.stringCommands().get(newKey).block();
assertThat(newKeyValue).isEqualTo(value);
if (hasTtl) {
assertThat(connection.keyCommands().ttl(newKey).block()).isGreaterThan(0);
} else {
assertThat(connection.keyCommands().ttl(newKey).block()).isEqualTo(-1);
}
final ByteBuffer newKeyValue = connection.stringCommands().get(newKey).block();
assertThat(newKeyValue).isEqualTo(value);
if (hasTtl) {
assertThat(connection.keyCommands().ttl(newKey).block()).isGreaterThan(0);
} else {
assertThat(connection.keyCommands().ttl(newKey).block()).isEqualTo(-1);
}
});
}
@Test
public void testRename_keyNotExist() {
Integer originalSlot = getSlotForKey(originalKey);
newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot));
testInClusterReactive(connection -> {
Integer originalSlot = getSlotForKey(originalKey, (RedissonReactiveRedisClusterConnection) connection);
newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot), connection);
if (sameSlot) {
// This is a quirk of the implementation - since same-slot renames use the non-cluster version,
// the result is a Redis error. This behavior matches other spring-data-redis implementations
assertThatThrownBy(() -> connection.keyCommands().rename(originalKey, newKey).block())
.isInstanceOf(RedisSystemException.class);
if (sameSlot) {
// This is a quirk of the implementation - since same-slot renames use the non-cluster version,
// the result is a Redis error. This behavior matches other spring-data-redis implementations
assertThatThrownBy(() -> connection.keyCommands().rename(originalKey, newKey).block())
.isInstanceOf(RedisSystemException.class);
} else {
Boolean response = connection.keyCommands().rename(originalKey, newKey).block();
} else {
Boolean response = connection.keyCommands().rename(originalKey, newKey).block();
assertThat(response).isTrue();
assertThat(response).isTrue();
final ByteBuffer newKeyValue = connection.stringCommands().get(newKey).block();
assertThat(newKeyValue).isEqualTo(null);
}
final ByteBuffer newKeyValue = connection.stringCommands().get(newKey).block();
assertThat(newKeyValue).isEqualTo(null);
}
});
}
protected ByteBuffer getNewKeyForSlot(String originalKey, Integer targetSlot) {
protected ByteBuffer getNewKeyForSlot(String originalKey, Integer targetSlot, ReactiveRedisClusterConnection connection) {
int counter = 0;
ByteBuffer newKey = ByteBuffer.wrap((originalKey + counter).getBytes());
Integer newKeySlot = getSlotForKey(newKey);
Integer newKeySlot = getSlotForKey(newKey, (RedissonReactiveRedisClusterConnection) connection);
while(!newKeySlot.equals(targetSlot)) {
counter++;
newKey = ByteBuffer.wrap((originalKey + counter).getBytes());
newKeySlot = getSlotForKey(newKey);
newKeySlot = getSlotForKey(newKey, (RedissonReactiveRedisClusterConnection) connection);
}
return newKey;
@ -157,36 +128,38 @@ public class RedissonReactiveClusterKeyCommandsTest {
@Test
public void testRenameNX() {
connection.stringCommands().set(originalKey, value).block();
if (hasTtl) {
connection.keyCommands().expire(originalKey, Duration.ofSeconds(1000)).block();
}
testInClusterReactive(connection -> {
connection.stringCommands().set(originalKey, value).block();
if (hasTtl) {
connection.keyCommands().expire(originalKey, Duration.ofSeconds(1000)).block();
}
Integer originalSlot = getSlotForKey(originalKey);
newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot));
Integer originalSlot = getSlotForKey(originalKey, (RedissonReactiveRedisClusterConnection) connection);
newKey = getNewKeyForSlot(new String(originalKey.array()), getTargetSlot(originalSlot), connection);
Boolean result = connection.keyCommands().renameNX(originalKey, newKey).block();
Boolean result = connection.keyCommands().renameNX(originalKey, newKey).block();
assertThat(result).isTrue();
assertThat(connection.stringCommands().get(newKey).block()).isEqualTo(value);
if (hasTtl) {
assertThat(connection.keyCommands().ttl(newKey).block()).isGreaterThan(0);
} else {
assertThat(connection.keyCommands().ttl(newKey).block()).isEqualTo(-1);
}
assertThat(result).isTrue();
assertThat(connection.stringCommands().get(newKey).block()).isEqualTo(value);
if (hasTtl) {
assertThat(connection.keyCommands().ttl(newKey).block()).isGreaterThan(0);
} else {
assertThat(connection.keyCommands().ttl(newKey).block()).isEqualTo(-1);
}
connection.stringCommands().set(originalKey, value).block();
connection.stringCommands().set(originalKey, value).block();
result = connection.keyCommands().renameNX(originalKey, newKey).block();
result = connection.keyCommands().renameNX(originalKey, newKey).block();
assertThat(result).isFalse();
assertThat(result).isFalse();
});
}
private Integer getTargetSlot(Integer originalSlot) {
return sameSlot ? originalSlot : MAX_SLOT - originalSlot - 1;
}
private Integer getSlotForKey(ByteBuffer key) {
private Integer getSlotForKey(ByteBuffer key, RedissonReactiveRedisClusterConnection connection) {
return (Integer) connection.read(null, StringCodec.INSTANCE, RedisCommands.KEYSLOT, key.array()).block();
}

@ -0,0 +1,21 @@
package org.redisson.spring.data.connection;
import org.junit.Test;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import java.time.Duration;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonReactiveKeyCommandsTest extends BaseConnectionTest {
@Test
public void testExpiration() {
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
ReactiveStringRedisTemplate t = new ReactiveStringRedisTemplate(factory);
t.opsForValue().set("123", "4343").block();
t.expire("123", Duration.ofMillis(1001)).block();
assertThat(t.getExpire("123").block().toMillis()).isBetween(900L, 1000L);
}
}
Loading…
Cancel
Save