diff --git a/redisson/src/main/java/org/redisson/RedissonJsonBucket.java b/redisson/src/main/java/org/redisson/RedissonJsonBucket.java index e89886e85..53b09d363 100644 --- a/redisson/src/main/java/org/redisson/RedissonJsonBucket.java +++ b/redisson/src/main/java/org/redisson/RedissonJsonBucket.java @@ -22,11 +22,18 @@ import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; +import org.redisson.client.protocol.RedisStrictCommand; +import org.redisson.client.protocol.convertor.JsonTypeConvertor; +import org.redisson.client.protocol.convertor.LongNumberConvertor; import org.redisson.client.protocol.convertor.NumberConvertor; +import org.redisson.client.protocol.decoder.ListFirstObjectDecoder; +import org.redisson.client.protocol.decoder.ListMultiDecoder2; import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; +import org.redisson.client.protocol.decoder.StringListListReplayDecoder; import org.redisson.codec.JsonCodec; import org.redisson.codec.JsonCodecWrapper; import org.redisson.command.CommandAsyncExecutor; +import org.redisson.config.Protocol; import java.math.BigDecimal; import java.time.Duration; @@ -87,6 +94,9 @@ public class RedissonJsonBucket extends RedissonExpirable implements RJsonBuc @Override public RFuture getAsync() { + if (getServiceManager().getCfg().getProtocol() == Protocol.RESP3) { + return commandExecutor.readAsync(getRawName(), codec, RedisCommands.JSON_GET, getRawName(), "."); + } return commandExecutor.readAsync(getRawName(), codec, RedisCommands.JSON_GET, getRawName()); } @@ -97,6 +107,12 @@ public class RedissonJsonBucket extends RedissonExpirable implements RJsonBuc @Override public RFuture getAsync(JsonCodec codec, String... paths) { + if (getServiceManager().getCfg().getProtocol() == Protocol.RESP3) { + if (paths.length == 0) { + paths = new String[]{"."}; + } + } + List args = new ArrayList<>(); args.add(getRawName()); args.addAll(Arrays.asList(paths)); @@ -761,7 +777,13 @@ public class RedissonJsonBucket extends RedissonExpirable implements RJsonBuc @Override public RFuture incrementAndGetAsync(String path, T delta) { - return commandExecutor.writeAsync(getRawName(), StringCodec.INSTANCE, new RedisCommand<>("JSON.NUMINCRBY", new NumberConvertor(delta.getClass())), + RedisCommand command; + if (getServiceManager().getCfg().getProtocol() == Protocol.RESP3) { + command = new RedisCommand<>("JSON.NUMINCRBY", new ListFirstObjectDecoder(), new LongNumberConvertor(delta.getClass())); + } else { + command = new RedisCommand<>("JSON.NUMINCRBY", new NumberConvertor(delta.getClass())); + } + return commandExecutor.writeAsync(getRawName(), StringCodec.INSTANCE, command, getRawName(), path, new BigDecimal(delta.toString()).toPlainString()); } @@ -784,7 +806,12 @@ public class RedissonJsonBucket extends RedissonExpirable implements RJsonBuc @Override public RFuture countKeysAsync() { - return commandExecutor.writeAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.JSON_OBJLEN, getRawName()); + RedisStrictCommand command = RedisCommands.JSON_OBJLEN; + if (getServiceManager().getCfg().getProtocol() == Protocol.RESP3) { + command = new RedisStrictCommand("JSON.OBJLEN", new ListFirstObjectDecoder()); + } + + return commandExecutor.writeAsync(getRawName(), LongCodec.INSTANCE, command, getRawName()); } @Override @@ -814,7 +841,12 @@ public class RedissonJsonBucket extends RedissonExpirable implements RJsonBuc @Override public RFuture> getKeysAsync() { - return commandExecutor.readAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.JSON_OBJKEYS, getRawName()); + RedisCommand command = RedisCommands.JSON_OBJKEYS; + if (getServiceManager().getCfg().getProtocol() == Protocol.RESP3) { + command = new RedisCommand("JSON.OBJKEYS", + new ListMultiDecoder2(new ListFirstObjectDecoder(), new StringListListReplayDecoder())); + } + return commandExecutor.readAsync(getRawName(), LongCodec.INSTANCE, command, getRawName()); } @Override @@ -864,7 +896,12 @@ public class RedissonJsonBucket extends RedissonExpirable implements RJsonBuc @Override public RFuture getTypeAsync() { - return commandExecutor.readAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.JSON_TYPE, getRawName()); + RedisCommand command = RedisCommands.JSON_TYPE; + if (getServiceManager().getCfg().getProtocol() == Protocol.RESP3) { + command = new RedisCommand("JSON.TYPE", new ListFirstObjectDecoder(), new JsonTypeConvertor()); + } + + return commandExecutor.readAsync(getRawName(), StringCodec.INSTANCE, command, getRawName()); } @Override @@ -874,7 +911,12 @@ public class RedissonJsonBucket extends RedissonExpirable implements RJsonBuc @Override public RFuture getTypeAsync(String path) { - return commandExecutor.readAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.JSON_TYPE, getRawName(), path); + RedisCommand command = RedisCommands.JSON_TYPE; + if (getServiceManager().getCfg().getProtocol() == Protocol.RESP3) { + command = new RedisCommand("JSON.TYPE", new ListFirstObjectDecoder(), new JsonTypeConvertor()); + } + + return commandExecutor.readAsync(getRawName(), StringCodec.INSTANCE, command, getRawName(), path); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonSearch.java b/redisson/src/main/java/org/redisson/RedissonSearch.java index 41f59fb58..a02daacc9 100644 --- a/redisson/src/main/java/org/redisson/RedissonSearch.java +++ b/redisson/src/main/java/org/redisson/RedissonSearch.java @@ -22,13 +22,17 @@ import org.redisson.api.search.aggregate.*; import org.redisson.api.search.index.*; import org.redisson.api.search.query.*; import org.redisson.client.codec.Codec; +import org.redisson.client.codec.DoubleCodec; import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.StringCodec; +import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisStrictCommand; +import org.redisson.client.protocol.convertor.EmptyMapConvertor; import org.redisson.client.protocol.decoder.*; import org.redisson.codec.CompositeCodec; import org.redisson.command.CommandAsyncExecutor; +import org.redisson.config.Protocol; import java.math.BigDecimal; import java.util.ArrayList; @@ -151,7 +155,7 @@ public class RedissonSearch implements RSearch { } args.add("VECTOR"); args.add("HNSW"); - args.add(params.getCount()); + args.add(params.getCount()*2); args.add("TYPE"); args.add(params.getType()); args.add("DIM"); @@ -473,14 +477,27 @@ public class RedissonSearch implements RSearch { args.add(options.getDialect()); } - RedisStrictCommand command = new RedisStrictCommand<>("FT.SEARCH", - new ListMultiDecoder2(new SearchResultDecoder(), - new ObjectMapReplayDecoder(new CompositeCodec(StringCodec.INSTANCE, codec)), - new ObjectListReplayDecoder())); + RedisStrictCommand command; + if (isResp3()) { + command = new RedisStrictCommand<>("FT.SEARCH", + new ListMultiDecoder2(new SearchResultDecoderV2(), + new ObjectListReplayDecoder(), + new ObjectMapReplayDecoder(), + new ObjectMapReplayDecoder(new CompositeCodec(StringCodec.INSTANCE, codec)))); + } else { + command = new RedisStrictCommand<>("FT.SEARCH", + new ListMultiDecoder2(new SearchResultDecoder(), + new ObjectMapReplayDecoder(new CompositeCodec(StringCodec.INSTANCE, codec)), + new ObjectListReplayDecoder())); + } return commandExecutor.writeAsync(indexName, StringCodec.INSTANCE, command, args.toArray()); } + private boolean isResp3() { + return commandExecutor.getServiceManager().getCfg().getProtocol() == Protocol.RESP3; + } + private String value(double score, boolean exclusive) { StringBuilder element = new StringBuilder(); if (Double.isInfinite(score)) { @@ -593,19 +610,35 @@ public class RedissonSearch implements RSearch { } RedisStrictCommand command; - if (options.isWithCursor()) { - command = new RedisStrictCommand<>("FT.AGGREGATE", - new ListMultiDecoder2(new AggregationCursorResultDecoder(), - new ObjectListReplayDecoder(), - new ObjectMapReplayDecoder(new CompositeCodec(StringCodec.INSTANCE, codec)))); + if (isResp3()) { + if (options.isWithCursor()) { + command = new RedisStrictCommand<>("FT.AGGREGATE", + new ListMultiDecoder2(new AggregationCursorResultDecoderV2(), + new ObjectListReplayDecoder(), + new ObjectListReplayDecoder(), + new ObjectMapReplayDecoder(), + new ObjectMapReplayDecoder(new CompositeCodec(StringCodec.INSTANCE, codec)))); + } else { + command = new RedisStrictCommand<>("FT.AGGREGATE", + new ListMultiDecoder2(new AggregationResultDecoderV2(), + new ObjectListReplayDecoder(), + new ObjectMapReplayDecoder(), + new ObjectMapReplayDecoder(new CompositeCodec(StringCodec.INSTANCE, codec)))); + } } else { - command = new RedisStrictCommand<>("FT.AGGREGATE", - new ListMultiDecoder2(new AggregationResultDecoder(), - new ObjectMapReplayDecoder(new CompositeCodec(StringCodec.INSTANCE, codec)), - new ObjectListReplayDecoder())); + if (options.isWithCursor()) { + command = new RedisStrictCommand<>("FT.AGGREGATE", + new ListMultiDecoder2(new AggregationCursorResultDecoder(), + new ObjectListReplayDecoder(), + new ObjectMapReplayDecoder(new CompositeCodec(StringCodec.INSTANCE, codec)))); + } else { + command = new RedisStrictCommand<>("FT.AGGREGATE", + new ListMultiDecoder2(new AggregationResultDecoder(), + new ObjectMapReplayDecoder(new CompositeCodec(StringCodec.INSTANCE, codec)), + new ObjectListReplayDecoder())); + } } - return commandExecutor.writeAsync(indexName, StringCodec.INSTANCE, command, args.toArray()); } @@ -703,10 +736,20 @@ public class RedissonSearch implements RSearch { @Override public RFuture readCursorAsync(String indexName, long cursorId) { - RedisStrictCommand command = new RedisStrictCommand<>("FT.CURSOR", "READ", - new ListMultiDecoder2(new AggregationCursorResultDecoder(), - new ObjectListReplayDecoder(), - new ObjectMapReplayDecoder(new CompositeCodec(StringCodec.INSTANCE, codec)))); + RedisStrictCommand command; + if (isResp3()) { + command = new RedisStrictCommand<>("FT.CURSOR", "READ", + new ListMultiDecoder2(new AggregationCursorResultDecoderV2(), + new ObjectListReplayDecoder(), + new ObjectListReplayDecoder(), + new ObjectMapReplayDecoder(), + new ObjectMapReplayDecoder(new CompositeCodec(StringCodec.INSTANCE, codec)))); + } else { + command = new RedisStrictCommand<>("FT.CURSOR", "READ", + new ListMultiDecoder2(new AggregationCursorResultDecoder(), + new ObjectListReplayDecoder(), + new ObjectMapReplayDecoder(new CompositeCodec(StringCodec.INSTANCE, codec)))); + } return commandExecutor.writeAsync(indexName, StringCodec.INSTANCE, command, indexName, cursorId); } @@ -824,7 +867,17 @@ public class RedissonSearch implements RSearch { args.add(options.getDialect()); } - return commandExecutor.readAsync(indexName, StringCodec.INSTANCE, RedisCommands.FT_SPELLCHECK, args.toArray()); + RedisCommand>> command = RedisCommands.FT_SPELLCHECK; + if (isResp3()) { + command = new RedisCommand<>("FT.SPELLCHECK", + new ListMultiDecoder2( + new ListObjectDecoder(1), + new ObjectMapReplayDecoder(), + new ListFirstObjectDecoder(new EmptyMapConvertor()), + new ObjectMapReplayDecoder(new CompositeCodec(StringCodec.INSTANCE, DoubleCodec.INSTANCE)))); + } + + return commandExecutor.readAsync(indexName, StringCodec.INSTANCE, command, args.toArray()); } @Override diff --git a/redisson/src/main/java/org/redisson/client/RedisClientConfig.java b/redisson/src/main/java/org/redisson/client/RedisClientConfig.java index 5f3829648..c0681ce58 100644 --- a/redisson/src/main/java/org/redisson/client/RedisClientConfig.java +++ b/redisson/src/main/java/org/redisson/client/RedisClientConfig.java @@ -20,10 +20,7 @@ import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.resolver.AddressResolverGroup; import io.netty.util.Timer; -import org.redisson.config.CommandMapper; -import org.redisson.config.CredentialsResolver; -import org.redisson.config.DefaultCommandMapper; -import org.redisson.config.SslProvider; +import org.redisson.config.*; import org.redisson.misc.RedisURI; import javax.net.ssl.KeyManagerFactory; @@ -85,6 +82,8 @@ public class RedisClientConfig { private FailedNodeDetector failedNodeDetector = new FailedConnectionDetector(); + private Protocol protocol = Protocol.RESP2; + public RedisClientConfig() { } @@ -129,6 +128,7 @@ public class RedisClientConfig { this.tcpKeepAliveIdle = config.tcpKeepAliveIdle; this.tcpKeepAliveInterval = config.tcpKeepAliveInterval; this.tcpUserTimeout = config.tcpUserTimeout; + this.protocol = config.protocol; } public NettyHook getNettyHook() { @@ -457,4 +457,13 @@ public class RedisClientConfig { this.failedNodeDetector = failedNodeDetector; return this; } + + public Protocol getProtocol() { + return protocol; + } + + public RedisClientConfig setProtocol(Protocol protocol) { + this.protocol = protocol; + return this; + } } diff --git a/redisson/src/main/java/org/redisson/client/handler/BaseConnectionHandler.java b/redisson/src/main/java/org/redisson/client/handler/BaseConnectionHandler.java index 712bdfd84..f2cb38c19 100644 --- a/redisson/src/main/java/org/redisson/client/handler/BaseConnectionHandler.java +++ b/redisson/src/main/java/org/redisson/client/handler/BaseConnectionHandler.java @@ -19,6 +19,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.redisson.client.*; import org.redisson.client.protocol.RedisCommands; +import org.redisson.config.Protocol; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -79,8 +80,10 @@ public abstract class BaseConnectionHandler extends C }); futures.add(f.toCompletableFuture()); -// CompletionStage f1 = connection.async(RedisCommands.HELLO, "3"); -// futures.add(f1.toCompletableFuture()); + if (redisClient.getConfig().getProtocol() == Protocol.RESP3) { + CompletionStage f1 = connection.async(RedisCommands.HELLO, "3"); + futures.add(f1.toCompletableFuture()); + } if (config.getDatabase() != 0) { CompletionStage future = connection.async(RedisCommands.SELECT, config.getDatabase()); diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java index ce7a83543..74951774c 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -164,7 +164,11 @@ public class CommandDecoder extends ReplayingDecoder { protected void skipDecode(ByteBuf in) throws IOException{ int code = in.readByte(); - if (code == '+') { + if (code == '_') { + in.skipBytes(2); + } else if (code == ',') { + skipString(in); + } else if (code == '+') { skipString(in); } else if (code == '-') { skipString(in); @@ -172,7 +176,14 @@ public class CommandDecoder extends ReplayingDecoder { skipString(in); } else if (code == '$') { skipBytes(in); - } else if (code == '*') { + } else if (code == '=') { + skipBytes(in); + } else if (code == '%') { + long size = readLong(in); + for (int i = 0; i < size * 2; i++) { + skipDecode(in); + } + } else if (code == '*' || code == '>' || code == '~') { long size = readLong(in); for (int i = 0; i < size; i++) { skipDecode(in); @@ -335,9 +346,21 @@ public class CommandDecoder extends ReplayingDecoder { protected void decode(ByteBuf in, CommandData data, List parts, Channel channel, boolean skipConvertor, List> commandsData) throws IOException { int code = in.readByte(); - if (code == '+') { + if (code == '_') { + readCRLF(in); + Object result = null; + handleResult(data, parts, result, false); + } else if (code == '+') { String result = readString(in); + handleResult(data, parts, result, skipConvertor); + } else if (code == ',') { + String str = readString(in); + Double result = Double.NaN; + if (!"nan".equals(str)) { + result = Double.valueOf(str); + } + handleResult(data, parts, result, skipConvertor); } else if (code == '-') { String error = readString(in); @@ -386,6 +409,15 @@ public class CommandDecoder extends ReplayingDecoder { } else if (code == ':') { Long result = readLong(in); handleResult(data, parts, result, false); + } else if (code == '=') { + ByteBuf buf = readBytes(in); + Object result = null; + if (buf != null) { + buf.skipBytes(3); + Decoder decoder = selectDecoder(data, parts); + result = decoder.decode(buf, state()); + } + handleResult(data, parts, result, false); } else if (code == '$') { ByteBuf buf = readBytes(in); Object result = null; @@ -394,7 +426,7 @@ public class CommandDecoder extends ReplayingDecoder { result = decoder.decode(buf, state()); } handleResult(data, parts, result, false); - } else if (code == '*') { + } else if (code == '*' || code == '>' || code == '~') { long size = readLong(in); List respParts = new ArrayList(Math.max((int) size, 0)); @@ -403,7 +435,16 @@ public class CommandDecoder extends ReplayingDecoder { decodeList(in, data, parts, channel, size, respParts, skipConvertor, commandsData); state().decLevel(); - + + } else if (code == '%') { + long size = readLong(in) * 2; + List respParts = new ArrayList(Math.max((int) size, 0)); + + state().incLevel(); + + decodeList(in, data, parts, channel, size, respParts, skipConvertor, commandsData); + + state().decLevel(); } else { String dataStr = in.toString(0, in.writerIndex(), CharsetUtil.UTF_8); throw new IllegalStateException("Can't decode replay: " + dataStr); @@ -420,7 +461,7 @@ public class CommandDecoder extends ReplayingDecoder { in.skipBytes(len + 2); return result; } - + @SuppressWarnings("unchecked") private void decodeList(ByteBuf in, CommandData data, List parts, Channel channel, long size, List respParts, boolean skipConvertor, List> commandsData) @@ -514,6 +555,10 @@ public class CommandDecoder extends ReplayingDecoder { return buffer; } + private void readCRLF(ByteBuf is) { + is.skipBytes(2); + } + private long readLong(ByteBuf is) throws IOException { long size = 0; int sign = 1; diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java index 2e08831e5..4ccb9cfe8 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java @@ -204,7 +204,7 @@ public class CommandPubSubDecoder extends CommandDecoder { }); } } else { - if (data != null && data.getCommand().getName().equals("PING")) { + if (data != null) { super.decodeResult(data, parts, channel, result); } } diff --git a/redisson/src/main/java/org/redisson/client/protocol/convertor/LongNumberConvertor.java b/redisson/src/main/java/org/redisson/client/protocol/convertor/LongNumberConvertor.java new file mode 100644 index 000000000..c690d71eb --- /dev/null +++ b/redisson/src/main/java/org/redisson/client/protocol/convertor/LongNumberConvertor.java @@ -0,0 +1,60 @@ +/** + * Copyright (c) 2013-2022 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol.convertor; + +import java.math.BigDecimal; + +/** + * + * @author Nikita Koksharov + * + */ +public class LongNumberConvertor implements Convertor { + + private Class resultClass; + + public LongNumberConvertor(Class resultClass) { + super(); + this.resultClass = resultClass; + } + + @Override + public Object convert(Object result) { + if (result instanceof Long) { + Long res = (Long) result; + if (resultClass.isAssignableFrom(Long.class)) { + return res; + } + if (resultClass.isAssignableFrom(Integer.class)) { + return res.intValue(); + } + if (resultClass.isAssignableFrom(BigDecimal.class)) { + return new BigDecimal(res); + } + } + if (result instanceof Double) { + Double res = (Double) result; + if (resultClass.isAssignableFrom(Float.class)) { + return ((Double) result).floatValue(); + } + if (resultClass.isAssignableFrom(Double.class)) { + return res; + } + } + throw new IllegalStateException("Wrong value type!"); + } + +} diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/AggregationCursorResultDecoderV2.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/AggregationCursorResultDecoderV2.java new file mode 100644 index 000000000..05d70b096 --- /dev/null +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/AggregationCursorResultDecoderV2.java @@ -0,0 +1,55 @@ +/** + * Copyright (c) 2013-2022 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol.decoder; + +import org.redisson.api.search.aggregate.AggregationResult; +import org.redisson.client.handler.State; + +import java.util.*; + +/** + * + * @author Nikita Koksharov + * + */ +public class AggregationCursorResultDecoderV2 implements MultiDecoder { + + @Override + public Object decode(List parts, State state) { + if (parts.isEmpty()) { + return new AggregationResult(0, Collections.emptyList(), -1); + } + + List attrs = (List) parts.get(0); + Map m = new HashMap<>(); + for (int i = 0; i < attrs.size(); i++) { + if (i % 2 != 0) { + m.put(attrs.get(i-1).toString(), attrs.get(i)); + } + } + + List> docs = new ArrayList<>(); + List> results = (List>) m.get("results"); + for (Map result : results) { + Map map = (Map) result.get("extra_attributes"); + docs.add(map); + } + Long total = (Long) m.get("total_results"); + long cursorId = (long) parts.get(1); + return new AggregationResult(total, docs, cursorId); + } + +} diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/AggregationResultDecoderV2.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/AggregationResultDecoderV2.java new file mode 100644 index 000000000..75713bc69 --- /dev/null +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/AggregationResultDecoderV2.java @@ -0,0 +1,56 @@ +/** + * Copyright (c) 2013-2022 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol.decoder; + +import org.redisson.api.search.aggregate.AggregationResult; +import org.redisson.client.handler.State; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * + * @author Nikita Koksharov + * + */ +public class AggregationResultDecoderV2 implements MultiDecoder { + + @Override + public Object decode(List parts, State state) { + if (parts.isEmpty()) { + return null; + } + + Map m = new HashMap<>(); + for (int i = 0; i < parts.size(); i++) { + if (i % 2 != 0) { + m.put(parts.get(i-1).toString(), parts.get(i)); + } + } + + List> docs = new ArrayList<>(); + List> results = (List>) m.get("results"); + for (Map result : results) { + Map attrs = (Map) result.get("extra_attributes"); + docs.add(attrs); + } + Long total = (Long) m.get("total_results"); + return new AggregationResult(total, docs); + } + +} diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/IndexInfoDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/IndexInfoDecoder.java index f6b53fcc6..a6da5ce93 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/IndexInfoDecoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/IndexInfoDecoder.java @@ -79,6 +79,10 @@ public class IndexInfoDecoder implements MultiDecoder { if (result.get(prop).toString().contains("nan")) { return 0L; } + if (result.get(prop) instanceof Double) { + Double d = (Double) result.get(prop); + return d.longValue(); + } return Long.valueOf(result.get(prop).toString()); } } diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/ListFirstObjectDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/ListFirstObjectDecoder.java index eab2a7a5e..9d8853ca8 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/ListFirstObjectDecoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/ListFirstObjectDecoder.java @@ -18,6 +18,7 @@ package org.redisson.client.protocol.decoder; import org.redisson.client.codec.Codec; import org.redisson.client.handler.State; import org.redisson.client.protocol.Decoder; +import org.redisson.client.protocol.ScoredEntry; import org.redisson.client.protocol.convertor.Convertor; import java.util.List; @@ -54,7 +55,7 @@ public class ListFirstObjectDecoder implements MultiDecoder { @Override public Object decode(List parts, State state) { - if (inner != null) { + if (inner != null && !parts.isEmpty() && !(parts.get(0) instanceof ScoredEntry)) { parts = (List) inner.decode(parts, state); } if (!parts.isEmpty()) { diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectFirstScoreReplayDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectFirstScoreReplayDecoder.java index f422fcf2a..e4a49cde5 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectFirstScoreReplayDecoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectFirstScoreReplayDecoder.java @@ -43,7 +43,7 @@ public class ObjectFirstScoreReplayDecoder implements MultiDecoder { if (parts.isEmpty()) { return null; } - return (Double) parts.get(1); + return (Double) parts.get(parts.size()-1); } } diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectMapReplayDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectMapReplayDecoder.java index 208e614ef..456c72d7a 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectMapReplayDecoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectMapReplayDecoder.java @@ -22,6 +22,7 @@ import org.redisson.client.protocol.Decoder; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; /** * @@ -57,6 +58,13 @@ public class ObjectMapReplayDecoder implements MultiDecoder> { @Override public Map decode(List parts, State state) { + if (!parts.isEmpty() && parts.get(0) instanceof Map) { + return ((List>) (Object) parts) + .stream() + .flatMap(v -> v.entrySet().stream()) + .collect(Collectors.toMap(v -> v.getKey(), v -> v.getValue())); + } + Map result = MultiDecoder.newLinkedHashMap(parts.size()/2); for (int i = 0; i < parts.size(); i++) { if (i % 2 != 0) { diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/ScoredSortedSetRandomMapDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/ScoredSortedSetRandomMapDecoder.java index 6f0c7642c..a44c17362 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/ScoredSortedSetRandomMapDecoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/ScoredSortedSetRandomMapDecoder.java @@ -20,6 +20,10 @@ import org.redisson.client.codec.DoubleCodec; import org.redisson.client.handler.State; import org.redisson.client.protocol.Decoder; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + /** * * @author Nikita Koksharov @@ -35,4 +39,15 @@ public class ScoredSortedSetRandomMapDecoder extends ObjectMapReplayDecoder decode(List parts, State state) { + if (!parts.isEmpty() && parts.get(0) instanceof Map) { + return ((List>) (Object) parts) + .stream() + .flatMap(v -> v.entrySet().stream()) + .collect(Collectors.toMap(v -> v.getKey(), v -> v.getValue())); + } + + return super.decode(parts, state); + } } diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/ScoredSortedSetReplayDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/ScoredSortedSetReplayDecoder.java index 40bb1da0c..b0057de90 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/ScoredSortedSetReplayDecoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/ScoredSortedSetReplayDecoder.java @@ -17,6 +17,7 @@ package org.redisson.client.protocol.decoder; import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; import org.redisson.client.codec.Codec; import org.redisson.client.codec.DoubleCodec; @@ -42,6 +43,9 @@ public class ScoredSortedSetReplayDecoder implements MultiDecoder> decode(List parts, State state) { + if (!parts.isEmpty() && parts.get(0) instanceof List) { + return ((List>>) (Object) parts).stream().flatMap(v -> v.stream()).collect(Collectors.toList()); + } List> result = new ArrayList<>(); for (int i = 0; i < parts.size(); i += 2) { result.add(new ScoredEntry(((Number) parts.get(i+1)).doubleValue(), (T) parts.get(i))); diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/SearchResultDecoderV2.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/SearchResultDecoderV2.java new file mode 100644 index 000000000..33265ae63 --- /dev/null +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/SearchResultDecoderV2.java @@ -0,0 +1,55 @@ +/** + * Copyright (c) 2013-2022 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol.decoder; + +import org.redisson.api.search.query.Document; +import org.redisson.api.search.query.SearchResult; +import org.redisson.client.handler.State; + +import java.util.*; + +/** + * + * @author Nikita Koksharov + * + */ +public class SearchResultDecoderV2 implements MultiDecoder { + + @Override + public Object decode(List parts, State state) { + if (parts.isEmpty()) { + return new SearchResult(0, Collections.emptyList()); + } + + Map m = new HashMap<>(); + for (int i = 0; i < parts.size(); i++) { + if (i % 2 != 0) { + m.put(parts.get(i-1).toString(), parts.get(i)); + } + } + + List docs = new ArrayList<>(); + List> results = (List>) m.get("results"); + for (Map result : results) { + String id = (String) result.get("id"); + Map attrs = (Map) result.get("extra_attributes"); + docs.add(new Document(id, attrs)); + } + Long total = (Long) m.get("total_results"); + return new SearchResult(total, docs); + } + +} diff --git a/redisson/src/main/java/org/redisson/config/Config.java b/redisson/src/main/java/org/redisson/config/Config.java index 911192977..7a97416cb 100644 --- a/redisson/src/main/java/org/redisson/config/Config.java +++ b/redisson/src/main/java/org/redisson/config/Config.java @@ -96,6 +96,8 @@ public class Config { private boolean lazyInitialization; + private Protocol protocol = Protocol.RESP2; + public Config() { } @@ -127,6 +129,7 @@ public class Config { setAddressResolverGroupFactory(oldConf.getAddressResolverGroupFactory()); setReliableTopicWatchdogTimeout(oldConf.getReliableTopicWatchdogTimeout()); setLazyInitialization(oldConf.isLazyInitialization()); + setProtocol(oldConf.getProtocol()); if (oldConf.getSingleServerConfig() != null) { setSingleServerConfig(new SingleServerConfig(oldConf.getSingleServerConfig())); @@ -879,10 +882,27 @@ public class Config { * * @param lazyInitialization true connects to Redis only when first Redis call is made, * false connects to Redis during Redisson instance creation. - * @return + * @return config */ public Config setLazyInitialization(boolean lazyInitialization) { this.lazyInitialization = lazyInitialization; return this; } + + public Protocol getProtocol() { + return protocol; + } + + /** + * Defines Redis protocol version. + *

+ * Default value is RESP2 + * + * @param protocol Redis protocol version + * @return config + */ + public Config setProtocol(Protocol protocol) { + this.protocol = protocol; + return this; + } } diff --git a/redisson/src/main/java/org/redisson/config/Protocol.java b/redisson/src/main/java/org/redisson/config/Protocol.java new file mode 100644 index 000000000..8f7216e91 --- /dev/null +++ b/redisson/src/main/java/org/redisson/config/Protocol.java @@ -0,0 +1,30 @@ +/** + * Copyright (c) 2013-2022 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.config; + +/** + * Redis protocol version + * + * @author Nikita Koksharov + * + */ +public enum Protocol { + + RESP2, + + RESP3 + +} diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 2b6f9b98a..2187687a3 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -361,6 +361,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { .setPassword(config.getPassword()) .setNettyHook(serviceManager.getCfg().getNettyHook()) .setFailedNodeDetector(config.getFailedSlaveNodeDetector()) + .setProtocol(serviceManager.getCfg().getProtocol()) .setCommandMapper(config.getCommandMapper()) .setCredentialsResolver(config.getCredentialsResolver()) .setConnectedListener(addr -> { diff --git a/redisson/src/test/java/org/redisson/RedisRunner.java b/redisson/src/test/java/org/redisson/RedisRunner.java index f1efc3313..2b71a6bb1 100644 --- a/redisson/src/test/java/org/redisson/RedisRunner.java +++ b/redisson/src/test/java/org/redisson/RedisRunner.java @@ -201,7 +201,7 @@ public class RedisRunner { private boolean randomDir = false; private ArrayList bindAddr = new ArrayList<>(); private int port = 6379; - private int retryCount = Integer.MAX_VALUE; + private int retryCount = 10; private boolean randomPort = false; private String sentinelFile; private String clusterFile; @@ -294,7 +294,16 @@ public class RedisRunner { throw new FailedToStartRedisException(); } Runtime.getRuntime().addShutdownHook(new Thread(() -> { - rp.stop(); + if (RedissonRuntimeEnvironment.isWindows + && RedissonRuntimeEnvironment.redisBinaryPath.contains("cmd")) { + try { + Runtime.getRuntime().exec("C:\\redis\\redis-server-stop.cmd"); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else { + rp.stop(); + } })); return rp; } @@ -927,31 +936,32 @@ public class RedisRunner { } public int stop() { - if (runner.isNosave() && !runner.isRandomDir()) { - RedisClient c = createDefaultRedisClientInstance(); - RedisConnection connection = c.connect(); + if (runner.isNosave()) { + RedisClientConfig config = new RedisClientConfig(); + config.setConnectTimeout(1000); + config.setAddress(runner.getInitialBindAddr(), runner.getPort()); + RedisClient c = RedisClient.create(config); + + RedisConnection connection = null; try { - connection.async(new RedisStrictCommand("SHUTDOWN", "NOSAVE", new VoidReplayConvertor())) - .get(3, TimeUnit.SECONDS); - } catch (InterruptedException interruptedException) { - //shutdown via command failed, lets wait and kill it later. - } catch (ExecutionException | TimeoutException e) { + connection = c.connect(); + connection.async(new RedisStrictCommand("SHUTDOWN", "NOSAVE", new VoidReplayConvertor())); + } catch (Exception e) { // skip } c.shutdown(); - connection.closeAsync().syncUninterruptibly(); } Process p = redisProcess; p.destroy(); - boolean normalTermination = false; - try { - normalTermination = p.waitFor(5, TimeUnit.SECONDS); - } catch (InterruptedException ex) { - //OK lets hurry up by force kill; - } - if (!normalTermination) { - p = p.destroyForcibly(); - } +// boolean normalTermination = false; +// try { +// normalTermination = p.waitFor(5, TimeUnit.SECONDS); +// } catch (InterruptedException ex) { +// //OK lets hurry up by force kill; +// } +// if (!normalTermination) { +// p = p.destroyForcibly(); +// } cleanup(); int exitCode = p.exitValue(); return exitCode == 1 && RedissonRuntimeEnvironment.isWindows ? 0 : exitCode; diff --git a/redisson/src/test/java/org/redisson/RedissonRuntimeEnvironment.java b/redisson/src/test/java/org/redisson/RedissonRuntimeEnvironment.java index 6966ce531..236d37dae 100644 --- a/redisson/src/test/java/org/redisson/RedissonRuntimeEnvironment.java +++ b/redisson/src/test/java/org/redisson/RedissonRuntimeEnvironment.java @@ -14,6 +14,7 @@ public class RedissonRuntimeEnvironment { public static final String OS; public static final boolean isWindows; private static final String MAC_PATH = "/usr/local/opt/redis/bin/redis-server"; +// private static final String WINDOW_PATH = "C:\\redis\\redis-server2.cmd"; private static final String WINDOW_PATH = "C:\\redis\\redis-server.exe"; static {