Feature - RESP3 protocol support. protocol setting added. #5413

pull/5427/head
Nikita Koksharov 1 year ago
parent be92e5a2c5
commit 68dd447835

@ -22,11 +22,18 @@ import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.convertor.JsonTypeConvertor;
import org.redisson.client.protocol.convertor.LongNumberConvertor;
import org.redisson.client.protocol.convertor.NumberConvertor;
import org.redisson.client.protocol.decoder.ListFirstObjectDecoder;
import org.redisson.client.protocol.decoder.ListMultiDecoder2;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.StringListListReplayDecoder;
import org.redisson.codec.JsonCodec;
import org.redisson.codec.JsonCodecWrapper;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.config.Protocol;
import java.math.BigDecimal;
import java.time.Duration;
@ -87,6 +94,9 @@ public class RedissonJsonBucket<V> extends RedissonExpirable implements RJsonBuc
@Override
public RFuture<V> getAsync() {
if (getServiceManager().getCfg().getProtocol() == Protocol.RESP3) {
return commandExecutor.readAsync(getRawName(), codec, RedisCommands.JSON_GET, getRawName(), ".");
}
return commandExecutor.readAsync(getRawName(), codec, RedisCommands.JSON_GET, getRawName());
}
@ -97,6 +107,12 @@ public class RedissonJsonBucket<V> extends RedissonExpirable implements RJsonBuc
@Override
public <T> RFuture<T> getAsync(JsonCodec<T> codec, String... paths) {
if (getServiceManager().getCfg().getProtocol() == Protocol.RESP3) {
if (paths.length == 0) {
paths = new String[]{"."};
}
}
List<Object> args = new ArrayList<>();
args.add(getRawName());
args.addAll(Arrays.asList(paths));
@ -761,7 +777,13 @@ public class RedissonJsonBucket<V> extends RedissonExpirable implements RJsonBuc
@Override
public <T extends Number> RFuture<T> incrementAndGetAsync(String path, T delta) {
return commandExecutor.writeAsync(getRawName(), StringCodec.INSTANCE, new RedisCommand<>("JSON.NUMINCRBY", new NumberConvertor(delta.getClass())),
RedisCommand command;
if (getServiceManager().getCfg().getProtocol() == Protocol.RESP3) {
command = new RedisCommand<>("JSON.NUMINCRBY", new ListFirstObjectDecoder(), new LongNumberConvertor(delta.getClass()));
} else {
command = new RedisCommand<>("JSON.NUMINCRBY", new NumberConvertor(delta.getClass()));
}
return commandExecutor.writeAsync(getRawName(), StringCodec.INSTANCE, command,
getRawName(), path, new BigDecimal(delta.toString()).toPlainString());
}
@ -784,7 +806,12 @@ public class RedissonJsonBucket<V> extends RedissonExpirable implements RJsonBuc
@Override
public RFuture<Long> countKeysAsync() {
return commandExecutor.writeAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.JSON_OBJLEN, getRawName());
RedisStrictCommand command = RedisCommands.JSON_OBJLEN;
if (getServiceManager().getCfg().getProtocol() == Protocol.RESP3) {
command = new RedisStrictCommand("JSON.OBJLEN", new ListFirstObjectDecoder());
}
return commandExecutor.writeAsync(getRawName(), LongCodec.INSTANCE, command, getRawName());
}
@Override
@ -814,7 +841,12 @@ public class RedissonJsonBucket<V> extends RedissonExpirable implements RJsonBuc
@Override
public RFuture<List<String>> getKeysAsync() {
return commandExecutor.readAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.JSON_OBJKEYS, getRawName());
RedisCommand command = RedisCommands.JSON_OBJKEYS;
if (getServiceManager().getCfg().getProtocol() == Protocol.RESP3) {
command = new RedisCommand("JSON.OBJKEYS",
new ListMultiDecoder2(new ListFirstObjectDecoder(), new StringListListReplayDecoder()));
}
return commandExecutor.readAsync(getRawName(), LongCodec.INSTANCE, command, getRawName());
}
@Override
@ -864,7 +896,12 @@ public class RedissonJsonBucket<V> extends RedissonExpirable implements RJsonBuc
@Override
public RFuture<JsonType> getTypeAsync() {
return commandExecutor.readAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.JSON_TYPE, getRawName());
RedisCommand command = RedisCommands.JSON_TYPE;
if (getServiceManager().getCfg().getProtocol() == Protocol.RESP3) {
command = new RedisCommand("JSON.TYPE", new ListFirstObjectDecoder(), new JsonTypeConvertor());
}
return commandExecutor.readAsync(getRawName(), StringCodec.INSTANCE, command, getRawName());
}
@Override
@ -874,7 +911,12 @@ public class RedissonJsonBucket<V> extends RedissonExpirable implements RJsonBuc
@Override
public RFuture<JsonType> getTypeAsync(String path) {
return commandExecutor.readAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.JSON_TYPE, getRawName(), path);
RedisCommand command = RedisCommands.JSON_TYPE;
if (getServiceManager().getCfg().getProtocol() == Protocol.RESP3) {
command = new RedisCommand("JSON.TYPE", new ListFirstObjectDecoder(), new JsonTypeConvertor());
}
return commandExecutor.readAsync(getRawName(), StringCodec.INSTANCE, command, getRawName(), path);
}
@Override

@ -22,13 +22,17 @@ import org.redisson.api.search.aggregate.*;
import org.redisson.api.search.index.*;
import org.redisson.api.search.query.*;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.convertor.EmptyMapConvertor;
import org.redisson.client.protocol.decoder.*;
import org.redisson.codec.CompositeCodec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.config.Protocol;
import java.math.BigDecimal;
import java.util.ArrayList;
@ -151,7 +155,7 @@ public class RedissonSearch implements RSearch {
}
args.add("VECTOR");
args.add("HNSW");
args.add(params.getCount());
args.add(params.getCount()*2);
args.add("TYPE");
args.add(params.getType());
args.add("DIM");
@ -473,14 +477,27 @@ public class RedissonSearch implements RSearch {
args.add(options.getDialect());
}
RedisStrictCommand<SearchResult> command = new RedisStrictCommand<>("FT.SEARCH",
new ListMultiDecoder2(new SearchResultDecoder(),
new ObjectMapReplayDecoder(new CompositeCodec(StringCodec.INSTANCE, codec)),
new ObjectListReplayDecoder()));
RedisStrictCommand<SearchResult> command;
if (isResp3()) {
command = new RedisStrictCommand<>("FT.SEARCH",
new ListMultiDecoder2(new SearchResultDecoderV2(),
new ObjectListReplayDecoder(),
new ObjectMapReplayDecoder(),
new ObjectMapReplayDecoder(new CompositeCodec(StringCodec.INSTANCE, codec))));
} else {
command = new RedisStrictCommand<>("FT.SEARCH",
new ListMultiDecoder2(new SearchResultDecoder(),
new ObjectMapReplayDecoder(new CompositeCodec(StringCodec.INSTANCE, codec)),
new ObjectListReplayDecoder()));
}
return commandExecutor.writeAsync(indexName, StringCodec.INSTANCE, command, args.toArray());
}
private boolean isResp3() {
return commandExecutor.getServiceManager().getCfg().getProtocol() == Protocol.RESP3;
}
private String value(double score, boolean exclusive) {
StringBuilder element = new StringBuilder();
if (Double.isInfinite(score)) {
@ -593,19 +610,35 @@ public class RedissonSearch implements RSearch {
}
RedisStrictCommand<AggregationResult> command;
if (options.isWithCursor()) {
command = new RedisStrictCommand<>("FT.AGGREGATE",
new ListMultiDecoder2(new AggregationCursorResultDecoder(),
new ObjectListReplayDecoder(),
new ObjectMapReplayDecoder(new CompositeCodec(StringCodec.INSTANCE, codec))));
if (isResp3()) {
if (options.isWithCursor()) {
command = new RedisStrictCommand<>("FT.AGGREGATE",
new ListMultiDecoder2(new AggregationCursorResultDecoderV2(),
new ObjectListReplayDecoder(),
new ObjectListReplayDecoder(),
new ObjectMapReplayDecoder(),
new ObjectMapReplayDecoder(new CompositeCodec(StringCodec.INSTANCE, codec))));
} else {
command = new RedisStrictCommand<>("FT.AGGREGATE",
new ListMultiDecoder2(new AggregationResultDecoderV2(),
new ObjectListReplayDecoder(),
new ObjectMapReplayDecoder(),
new ObjectMapReplayDecoder(new CompositeCodec(StringCodec.INSTANCE, codec))));
}
} else {
command = new RedisStrictCommand<>("FT.AGGREGATE",
new ListMultiDecoder2(new AggregationResultDecoder(),
new ObjectMapReplayDecoder(new CompositeCodec(StringCodec.INSTANCE, codec)),
new ObjectListReplayDecoder()));
if (options.isWithCursor()) {
command = new RedisStrictCommand<>("FT.AGGREGATE",
new ListMultiDecoder2(new AggregationCursorResultDecoder(),
new ObjectListReplayDecoder(),
new ObjectMapReplayDecoder(new CompositeCodec(StringCodec.INSTANCE, codec))));
} else {
command = new RedisStrictCommand<>("FT.AGGREGATE",
new ListMultiDecoder2(new AggregationResultDecoder(),
new ObjectMapReplayDecoder(new CompositeCodec(StringCodec.INSTANCE, codec)),
new ObjectListReplayDecoder()));
}
}
return commandExecutor.writeAsync(indexName, StringCodec.INSTANCE, command, args.toArray());
}
@ -703,10 +736,20 @@ public class RedissonSearch implements RSearch {
@Override
public RFuture<AggregationResult> readCursorAsync(String indexName, long cursorId) {
RedisStrictCommand command = new RedisStrictCommand<>("FT.CURSOR", "READ",
new ListMultiDecoder2(new AggregationCursorResultDecoder(),
new ObjectListReplayDecoder(),
new ObjectMapReplayDecoder(new CompositeCodec(StringCodec.INSTANCE, codec))));
RedisStrictCommand command;
if (isResp3()) {
command = new RedisStrictCommand<>("FT.CURSOR", "READ",
new ListMultiDecoder2(new AggregationCursorResultDecoderV2(),
new ObjectListReplayDecoder(),
new ObjectListReplayDecoder(),
new ObjectMapReplayDecoder(),
new ObjectMapReplayDecoder(new CompositeCodec(StringCodec.INSTANCE, codec))));
} else {
command = new RedisStrictCommand<>("FT.CURSOR", "READ",
new ListMultiDecoder2(new AggregationCursorResultDecoder(),
new ObjectListReplayDecoder(),
new ObjectMapReplayDecoder(new CompositeCodec(StringCodec.INSTANCE, codec))));
}
return commandExecutor.writeAsync(indexName, StringCodec.INSTANCE, command, indexName, cursorId);
}
@ -824,7 +867,17 @@ public class RedissonSearch implements RSearch {
args.add(options.getDialect());
}
return commandExecutor.readAsync(indexName, StringCodec.INSTANCE, RedisCommands.FT_SPELLCHECK, args.toArray());
RedisCommand<Map<String, Map<String, Object>>> command = RedisCommands.FT_SPELLCHECK;
if (isResp3()) {
command = new RedisCommand<>("FT.SPELLCHECK",
new ListMultiDecoder2(
new ListObjectDecoder(1),
new ObjectMapReplayDecoder(),
new ListFirstObjectDecoder(new EmptyMapConvertor()),
new ObjectMapReplayDecoder(new CompositeCodec(StringCodec.INSTANCE, DoubleCodec.INSTANCE))));
}
return commandExecutor.readAsync(indexName, StringCodec.INSTANCE, command, args.toArray());
}
@Override

@ -20,10 +20,7 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.resolver.AddressResolverGroup;
import io.netty.util.Timer;
import org.redisson.config.CommandMapper;
import org.redisson.config.CredentialsResolver;
import org.redisson.config.DefaultCommandMapper;
import org.redisson.config.SslProvider;
import org.redisson.config.*;
import org.redisson.misc.RedisURI;
import javax.net.ssl.KeyManagerFactory;
@ -85,6 +82,8 @@ public class RedisClientConfig {
private FailedNodeDetector failedNodeDetector = new FailedConnectionDetector();
private Protocol protocol = Protocol.RESP2;
public RedisClientConfig() {
}
@ -129,6 +128,7 @@ public class RedisClientConfig {
this.tcpKeepAliveIdle = config.tcpKeepAliveIdle;
this.tcpKeepAliveInterval = config.tcpKeepAliveInterval;
this.tcpUserTimeout = config.tcpUserTimeout;
this.protocol = config.protocol;
}
public NettyHook getNettyHook() {
@ -457,4 +457,13 @@ public class RedisClientConfig {
this.failedNodeDetector = failedNodeDetector;
return this;
}
public Protocol getProtocol() {
return protocol;
}
public RedisClientConfig setProtocol(Protocol protocol) {
this.protocol = protocol;
return this;
}
}

@ -19,6 +19,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.redisson.client.*;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.config.Protocol;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@ -79,8 +80,10 @@ public abstract class BaseConnectionHandler<C extends RedisConnection> extends C
});
futures.add(f.toCompletableFuture());
// CompletionStage<Object> f1 = connection.async(RedisCommands.HELLO, "3");
// futures.add(f1.toCompletableFuture());
if (redisClient.getConfig().getProtocol() == Protocol.RESP3) {
CompletionStage<Object> f1 = connection.async(RedisCommands.HELLO, "3");
futures.add(f1.toCompletableFuture());
}
if (config.getDatabase() != 0) {
CompletionStage<Object> future = connection.async(RedisCommands.SELECT, config.getDatabase());

@ -164,7 +164,11 @@ public class CommandDecoder extends ReplayingDecoder<State> {
protected void skipDecode(ByteBuf in) throws IOException{
int code = in.readByte();
if (code == '+') {
if (code == '_') {
in.skipBytes(2);
} else if (code == ',') {
skipString(in);
} else if (code == '+') {
skipString(in);
} else if (code == '-') {
skipString(in);
@ -172,7 +176,14 @@ public class CommandDecoder extends ReplayingDecoder<State> {
skipString(in);
} else if (code == '$') {
skipBytes(in);
} else if (code == '*') {
} else if (code == '=') {
skipBytes(in);
} else if (code == '%') {
long size = readLong(in);
for (int i = 0; i < size * 2; i++) {
skipDecode(in);
}
} else if (code == '*' || code == '>' || code == '~') {
long size = readLong(in);
for (int i = 0; i < size; i++) {
skipDecode(in);
@ -335,9 +346,21 @@ public class CommandDecoder extends ReplayingDecoder<State> {
protected void decode(ByteBuf in, CommandData<Object, Object> data, List<Object> parts, Channel channel, boolean skipConvertor, List<CommandData<?, ?>> commandsData) throws IOException {
int code = in.readByte();
if (code == '+') {
if (code == '_') {
readCRLF(in);
Object result = null;
handleResult(data, parts, result, false);
} else if (code == '+') {
String result = readString(in);
handleResult(data, parts, result, skipConvertor);
} else if (code == ',') {
String str = readString(in);
Double result = Double.NaN;
if (!"nan".equals(str)) {
result = Double.valueOf(str);
}
handleResult(data, parts, result, skipConvertor);
} else if (code == '-') {
String error = readString(in);
@ -386,6 +409,15 @@ public class CommandDecoder extends ReplayingDecoder<State> {
} else if (code == ':') {
Long result = readLong(in);
handleResult(data, parts, result, false);
} else if (code == '=') {
ByteBuf buf = readBytes(in);
Object result = null;
if (buf != null) {
buf.skipBytes(3);
Decoder<Object> decoder = selectDecoder(data, parts);
result = decoder.decode(buf, state());
}
handleResult(data, parts, result, false);
} else if (code == '$') {
ByteBuf buf = readBytes(in);
Object result = null;
@ -394,7 +426,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
result = decoder.decode(buf, state());
}
handleResult(data, parts, result, false);
} else if (code == '*') {
} else if (code == '*' || code == '>' || code == '~') {
long size = readLong(in);
List<Object> respParts = new ArrayList<Object>(Math.max((int) size, 0));
@ -403,7 +435,16 @@ public class CommandDecoder extends ReplayingDecoder<State> {
decodeList(in, data, parts, channel, size, respParts, skipConvertor, commandsData);
state().decLevel();
} else if (code == '%') {
long size = readLong(in) * 2;
List<Object> respParts = new ArrayList<Object>(Math.max((int) size, 0));
state().incLevel();
decodeList(in, data, parts, channel, size, respParts, skipConvertor, commandsData);
state().decLevel();
} else {
String dataStr = in.toString(0, in.writerIndex(), CharsetUtil.UTF_8);
throw new IllegalStateException("Can't decode replay: " + dataStr);
@ -420,7 +461,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
in.skipBytes(len + 2);
return result;
}
@SuppressWarnings("unchecked")
private void decodeList(ByteBuf in, CommandData<Object, Object> data, List<Object> parts,
Channel channel, long size, List<Object> respParts, boolean skipConvertor, List<CommandData<?, ?>> commandsData)
@ -514,6 +555,10 @@ public class CommandDecoder extends ReplayingDecoder<State> {
return buffer;
}
private void readCRLF(ByteBuf is) {
is.skipBytes(2);
}
private long readLong(ByteBuf is) throws IOException {
long size = 0;
int sign = 1;

@ -204,7 +204,7 @@ public class CommandPubSubDecoder extends CommandDecoder {
});
}
} else {
if (data != null && data.getCommand().getName().equals("PING")) {
if (data != null) {
super.decodeResult(data, parts, channel, result);
}
}

@ -0,0 +1,60 @@
/**
* Copyright (c) 2013-2022 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.convertor;
import java.math.BigDecimal;
/**
*
* @author Nikita Koksharov
*
*/
public class LongNumberConvertor implements Convertor<Object> {
private Class<?> resultClass;
public LongNumberConvertor(Class<?> resultClass) {
super();
this.resultClass = resultClass;
}
@Override
public Object convert(Object result) {
if (result instanceof Long) {
Long res = (Long) result;
if (resultClass.isAssignableFrom(Long.class)) {
return res;
}
if (resultClass.isAssignableFrom(Integer.class)) {
return res.intValue();
}
if (resultClass.isAssignableFrom(BigDecimal.class)) {
return new BigDecimal(res);
}
}
if (result instanceof Double) {
Double res = (Double) result;
if (resultClass.isAssignableFrom(Float.class)) {
return ((Double) result).floatValue();
}
if (resultClass.isAssignableFrom(Double.class)) {
return res;
}
}
throw new IllegalStateException("Wrong value type!");
}
}

@ -0,0 +1,55 @@
/**
* Copyright (c) 2013-2022 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import org.redisson.api.search.aggregate.AggregationResult;
import org.redisson.client.handler.State;
import java.util.*;
/**
*
* @author Nikita Koksharov
*
*/
public class AggregationCursorResultDecoderV2 implements MultiDecoder<Object> {
@Override
public Object decode(List<Object> parts, State state) {
if (parts.isEmpty()) {
return new AggregationResult(0, Collections.emptyList(), -1);
}
List<Object> attrs = (List<Object>) parts.get(0);
Map<String, Object> m = new HashMap<>();
for (int i = 0; i < attrs.size(); i++) {
if (i % 2 != 0) {
m.put(attrs.get(i-1).toString(), attrs.get(i));
}
}
List<Map<String, Object>> docs = new ArrayList<>();
List<Map<String, Object>> results = (List<Map<String, Object>>) m.get("results");
for (Map<String, Object> result : results) {
Map<String, Object> map = (Map<String, Object>) result.get("extra_attributes");
docs.add(map);
}
Long total = (Long) m.get("total_results");
long cursorId = (long) parts.get(1);
return new AggregationResult(total, docs, cursorId);
}
}

@ -0,0 +1,56 @@
/**
* Copyright (c) 2013-2022 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import org.redisson.api.search.aggregate.AggregationResult;
import org.redisson.client.handler.State;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
*
* @author Nikita Koksharov
*
*/
public class AggregationResultDecoderV2 implements MultiDecoder<Object> {
@Override
public Object decode(List<Object> parts, State state) {
if (parts.isEmpty()) {
return null;
}
Map<String, Object> m = new HashMap<>();
for (int i = 0; i < parts.size(); i++) {
if (i % 2 != 0) {
m.put(parts.get(i-1).toString(), parts.get(i));
}
}
List<Map<String, Object>> docs = new ArrayList<>();
List<Map<String, Object>> results = (List<Map<String, Object>>) m.get("results");
for (Map<String, Object> result : results) {
Map<String, Object> attrs = (Map<String, Object>) result.get("extra_attributes");
docs.add(attrs);
}
Long total = (Long) m.get("total_results");
return new AggregationResult(total, docs);
}
}

@ -79,6 +79,10 @@ public class IndexInfoDecoder implements MultiDecoder<Object> {
if (result.get(prop).toString().contains("nan")) {
return 0L;
}
if (result.get(prop) instanceof Double) {
Double d = (Double) result.get(prop);
return d.longValue();
}
return Long.valueOf(result.get(prop).toString());
}
}

@ -18,6 +18,7 @@ package org.redisson.client.protocol.decoder;
import org.redisson.client.codec.Codec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.ScoredEntry;
import org.redisson.client.protocol.convertor.Convertor;
import java.util.List;
@ -54,7 +55,7 @@ public class ListFirstObjectDecoder implements MultiDecoder<Object> {
@Override
public Object decode(List<Object> parts, State state) {
if (inner != null) {
if (inner != null && !parts.isEmpty() && !(parts.get(0) instanceof ScoredEntry)) {
parts = (List) inner.decode(parts, state);
}
if (!parts.isEmpty()) {

@ -43,7 +43,7 @@ public class ObjectFirstScoreReplayDecoder implements MultiDecoder<Double> {
if (parts.isEmpty()) {
return null;
}
return (Double) parts.get(1);
return (Double) parts.get(parts.size()-1);
}
}

@ -22,6 +22,7 @@ import org.redisson.client.protocol.Decoder;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
/**
*
@ -57,6 +58,13 @@ public class ObjectMapReplayDecoder<K, V> implements MultiDecoder<Map<K, V>> {
@Override
public Map<K, V> decode(List<Object> parts, State state) {
if (!parts.isEmpty() && parts.get(0) instanceof Map) {
return ((List<Map<K, V>>) (Object) parts)
.stream()
.flatMap(v -> v.entrySet().stream())
.collect(Collectors.toMap(v -> v.getKey(), v -> v.getValue()));
}
Map<K, V> result = MultiDecoder.newLinkedHashMap(parts.size()/2);
for (int i = 0; i < parts.size(); i++) {
if (i % 2 != 0) {

@ -20,6 +20,10 @@ import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
*
* @author Nikita Koksharov
@ -35,4 +39,15 @@ public class ScoredSortedSetRandomMapDecoder extends ObjectMapReplayDecoder<Obje
return DoubleCodec.INSTANCE.getValueDecoder();
}
@Override
public Map<Object, Object> decode(List<Object> parts, State state) {
if (!parts.isEmpty() && parts.get(0) instanceof Map) {
return ((List<Map<Object, Object>>) (Object) parts)
.stream()
.flatMap(v -> v.entrySet().stream())
.collect(Collectors.toMap(v -> v.getKey(), v -> v.getValue()));
}
return super.decode(parts, state);
}
}

@ -17,6 +17,7 @@ package org.redisson.client.protocol.decoder;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.DoubleCodec;
@ -42,6 +43,9 @@ public class ScoredSortedSetReplayDecoder<T> implements MultiDecoder<List<Scored
@Override
public List<ScoredEntry<T>> decode(List<Object> parts, State state) {
if (!parts.isEmpty() && parts.get(0) instanceof List) {
return ((List<List<ScoredEntry<T>>>) (Object) parts).stream().flatMap(v -> v.stream()).collect(Collectors.toList());
}
List<ScoredEntry<T>> result = new ArrayList<>();
for (int i = 0; i < parts.size(); i += 2) {
result.add(new ScoredEntry<T>(((Number) parts.get(i+1)).doubleValue(), (T) parts.get(i)));

@ -0,0 +1,55 @@
/**
* Copyright (c) 2013-2022 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import org.redisson.api.search.query.Document;
import org.redisson.api.search.query.SearchResult;
import org.redisson.client.handler.State;
import java.util.*;
/**
*
* @author Nikita Koksharov
*
*/
public class SearchResultDecoderV2 implements MultiDecoder<Object> {
@Override
public Object decode(List<Object> parts, State state) {
if (parts.isEmpty()) {
return new SearchResult(0, Collections.emptyList());
}
Map<String, Object> m = new HashMap<>();
for (int i = 0; i < parts.size(); i++) {
if (i % 2 != 0) {
m.put(parts.get(i-1).toString(), parts.get(i));
}
}
List<Document> docs = new ArrayList<>();
List<Map<String, Object>> results = (List<Map<String, Object>>) m.get("results");
for (Map<String, Object> result : results) {
String id = (String) result.get("id");
Map<String, Object> attrs = (Map<String, Object>) result.get("extra_attributes");
docs.add(new Document(id, attrs));
}
Long total = (Long) m.get("total_results");
return new SearchResult(total, docs);
}
}

@ -96,6 +96,8 @@ public class Config {
private boolean lazyInitialization;
private Protocol protocol = Protocol.RESP2;
public Config() {
}
@ -127,6 +129,7 @@ public class Config {
setAddressResolverGroupFactory(oldConf.getAddressResolverGroupFactory());
setReliableTopicWatchdogTimeout(oldConf.getReliableTopicWatchdogTimeout());
setLazyInitialization(oldConf.isLazyInitialization());
setProtocol(oldConf.getProtocol());
if (oldConf.getSingleServerConfig() != null) {
setSingleServerConfig(new SingleServerConfig(oldConf.getSingleServerConfig()));
@ -879,10 +882,27 @@ public class Config {
*
* @param lazyInitialization <code>true</code> connects to Redis only when first Redis call is made,
* <code>false</code> connects to Redis during Redisson instance creation.
* @return
* @return config
*/
public Config setLazyInitialization(boolean lazyInitialization) {
this.lazyInitialization = lazyInitialization;
return this;
}
public Protocol getProtocol() {
return protocol;
}
/**
* Defines Redis protocol version.
* <p>
* Default value is <code>RESP2</code>
*
* @param protocol Redis protocol version
* @return config
*/
public Config setProtocol(Protocol protocol) {
this.protocol = protocol;
return this;
}
}

@ -0,0 +1,30 @@
/**
* Copyright (c) 2013-2022 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.config;
/**
* Redis protocol version
*
* @author Nikita Koksharov
*
*/
public enum Protocol {
RESP2,
RESP3
}

@ -361,6 +361,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
.setPassword(config.getPassword())
.setNettyHook(serviceManager.getCfg().getNettyHook())
.setFailedNodeDetector(config.getFailedSlaveNodeDetector())
.setProtocol(serviceManager.getCfg().getProtocol())
.setCommandMapper(config.getCommandMapper())
.setCredentialsResolver(config.getCredentialsResolver())
.setConnectedListener(addr -> {

@ -201,7 +201,7 @@ public class RedisRunner {
private boolean randomDir = false;
private ArrayList<String> bindAddr = new ArrayList<>();
private int port = 6379;
private int retryCount = Integer.MAX_VALUE;
private int retryCount = 10;
private boolean randomPort = false;
private String sentinelFile;
private String clusterFile;
@ -294,7 +294,16 @@ public class RedisRunner {
throw new FailedToStartRedisException();
}
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
rp.stop();
if (RedissonRuntimeEnvironment.isWindows
&& RedissonRuntimeEnvironment.redisBinaryPath.contains("cmd")) {
try {
Runtime.getRuntime().exec("C:\\redis\\redis-server-stop.cmd");
} catch (IOException e) {
throw new RuntimeException(e);
}
} else {
rp.stop();
}
}));
return rp;
}
@ -927,31 +936,32 @@ public class RedisRunner {
}
public int stop() {
if (runner.isNosave() && !runner.isRandomDir()) {
RedisClient c = createDefaultRedisClientInstance();
RedisConnection connection = c.connect();
if (runner.isNosave()) {
RedisClientConfig config = new RedisClientConfig();
config.setConnectTimeout(1000);
config.setAddress(runner.getInitialBindAddr(), runner.getPort());
RedisClient c = RedisClient.create(config);
RedisConnection connection = null;
try {
connection.async(new RedisStrictCommand<Void>("SHUTDOWN", "NOSAVE", new VoidReplayConvertor()))
.get(3, TimeUnit.SECONDS);
} catch (InterruptedException interruptedException) {
//shutdown via command failed, lets wait and kill it later.
} catch (ExecutionException | TimeoutException e) {
connection = c.connect();
connection.async(new RedisStrictCommand<Void>("SHUTDOWN", "NOSAVE", new VoidReplayConvertor()));
} catch (Exception e) {
// skip
}
c.shutdown();
connection.closeAsync().syncUninterruptibly();
}
Process p = redisProcess;
p.destroy();
boolean normalTermination = false;
try {
normalTermination = p.waitFor(5, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
//OK lets hurry up by force kill;
}
if (!normalTermination) {
p = p.destroyForcibly();
}
// boolean normalTermination = false;
// try {
// normalTermination = p.waitFor(5, TimeUnit.SECONDS);
// } catch (InterruptedException ex) {
// //OK lets hurry up by force kill;
// }
// if (!normalTermination) {
// p = p.destroyForcibly();
// }
cleanup();
int exitCode = p.exitValue();
return exitCode == 1 && RedissonRuntimeEnvironment.isWindows ? 0 : exitCode;

@ -14,6 +14,7 @@ public class RedissonRuntimeEnvironment {
public static final String OS;
public static final boolean isWindows;
private static final String MAC_PATH = "/usr/local/opt/redis/bin/redis-server";
// private static final String WINDOW_PATH = "C:\\redis\\redis-server2.cmd";
private static final String WINDOW_PATH = "C:\\redis\\redis-server.exe";
static {

Loading…
Cancel
Save