From d444adab9aaf40a466e1cbc063a409c0dd7c95ab Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 10 Jul 2017 18:51:50 +0300 Subject: [PATCH 1/4] refactoring --- .../java/org/redisson/client/protocol/RedisCommands.java | 6 +++--- .../java/org/redisson/reactive/RedissonListReactive.java | 9 +++++---- .../org/redisson/reactive/RedissonSetCacheReactive.java | 4 ++-- .../java/org/redisson/reactive/RedissonSetReactive.java | 2 +- .../src/test/java/org/redisson/CommandHandlersTest.java | 4 ++-- 5 files changed, 13 insertions(+), 12 deletions(-) diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index f1c7b3fe0..be3dbfc2a 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -143,7 +143,7 @@ public interface RedisCommands { RedisCommand> EXEC = new RedisCommand>("EXEC", new ObjectListReplayDecoder()); RedisCommand SADD_BOOL = new RedisCommand("SADD", new BooleanAmountReplayConvertor(), 2, ValueType.OBJECTS); - RedisStrictCommand SADD = new RedisStrictCommand("SADD", 2, ValueType.OBJECTS); + RedisCommand SADD = new RedisCommand("SADD", new IntegerReplayConvertor(), 2, ValueType.OBJECTS); RedisCommand> SPOP = new RedisCommand>("SPOP", new ObjectSetReplayDecoder()); RedisCommand SPOP_SINGLE = new RedisCommand("SPOP"); RedisCommand SADD_SINGLE = new RedisCommand("SADD", new BooleanReplayConvertor(), 2); @@ -200,12 +200,12 @@ public interface RedisCommands { RedisCommand SORT_TO = new RedisCommand("SORT", new IntegerReplayConvertor()); RedisStrictCommand RPOP = new RedisStrictCommand("RPOP"); - RedisStrictCommand LPUSH = new RedisStrictCommand("LPUSH", 2, ValueType.OBJECTS); + RedisCommand LPUSH = new RedisCommand("LPUSH", new IntegerReplayConvertor(), 2, ValueType.OBJECTS); RedisCommand LPUSH_BOOLEAN = new RedisCommand("LPUSH", new TrueReplayConvertor(), 2, ValueType.OBJECTS); RedisStrictCommand LPUSH_VOID = new RedisStrictCommand("LPUSH", new VoidReplayConvertor(), 2); RedisCommand> LRANGE = new RedisCommand>("LRANGE", new ObjectListReplayDecoder()); RedisCommand> LRANGE_SET = new RedisCommand>("LRANGE", new ObjectSetReplayDecoder()); - RedisCommand RPUSH = new RedisCommand("RPUSH", 2, ValueType.OBJECTS); + RedisCommand RPUSH = new RedisCommand("RPUSH", new IntegerReplayConvertor(), 2, ValueType.OBJECTS); RedisCommand RPUSH_BOOLEAN = new RedisCommand("RPUSH", new TrueReplayConvertor(), 2, ValueType.OBJECTS); RedisCommand RPUSH_VOID = new RedisCommand("RPUSH", new VoidReplayConvertor(), 2, ValueType.OBJECTS); diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java index 040f494ea..bd6a70a71 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java @@ -35,6 +35,7 @@ import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; +import org.redisson.client.protocol.convertor.IntegerReplayConvertor; import org.redisson.client.protocol.convertor.LongReplayConvertor; import org.redisson.command.CommandReactiveExecutor; @@ -68,7 +69,7 @@ public class RedissonListReactive extends RedissonExpirableReactive implement @Override public Publisher size() { - return commandExecutor.readReactive(getName(), codec, LLEN, getName()); + return commandExecutor.readReactive(getName(), codec, RedisCommands.LLEN_INT, getName()); } @Override @@ -220,7 +221,7 @@ public class RedissonListReactive extends RedissonExpirableReactive implement List args = new ArrayList(coll.size() + 1); args.add(index); args.addAll(coll); - return commandExecutor.evalWriteReactive(getName(), codec, new RedisCommand("EVAL", new LongReplayConvertor(), 5, ValueType.OBJECTS), + return commandExecutor.evalWriteReactive(getName(), codec, new RedisCommand("EVAL", new IntegerReplayConvertor(), 5, ValueType.OBJECTS), "local ind = table.remove(ARGV, 1); " + // index is the first parameter "local size = redis.call('llen', KEYS[1]); " + "assert(tonumber(ind) <= size, 'index: ' .. ind .. ' but current size: ' .. size); " + @@ -332,8 +333,8 @@ public class RedissonListReactive extends RedissonExpirableReactive implement } }).count().next().poll(); - boolean res = count.equals(Streams.wrap(size()).next().poll()); - res &= count.equals(Streams.wrap(((RedissonListReactive) o).size()).next().poll()); + boolean res = count.intValue() == Streams.wrap(size()).next().poll(); + res &= count.intValue() == Streams.wrap(((RedissonListReactive) o).size()).next().poll(); return res; } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java index c4882f69f..1a6765ac4 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java @@ -71,7 +71,7 @@ public class RedissonSetCacheReactive extends RedissonExpirableReactive imple @Override public Publisher size() { - return commandExecutor.readReactive(getName(), codec, RedisCommands.ZCARD, getName()); + return commandExecutor.readReactive(getName(), codec, RedisCommands.ZCARD_INT, getName()); } @Override @@ -116,7 +116,7 @@ public class RedissonSetCacheReactive extends RedissonExpirableReactive imple @Override public Publisher add(V value) { long timeoutDate = 92233720368547758L; - return commandExecutor.evalWriteReactive(getName(), codec, RedisCommands.EVAL_LONG, + return commandExecutor.evalWriteReactive(getName(), codec, RedisCommands.EVAL_INTEGER, "local expireDateScore = redis.call('zscore', KEYS[1], ARGV[3]); " + "if expireDateScore ~= false and tonumber(expireDateScore) > tonumber(ARGV[1]) then " + "return 0;" diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java index 647cdb2fb..54d8e4f25 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java @@ -63,7 +63,7 @@ public class RedissonSetReactive extends RedissonExpirableReactive implements @Override public Publisher size() { - return commandExecutor.readReactive(getName(), codec, RedisCommands.SCARD, getName()); + return commandExecutor.readReactive(getName(), codec, RedisCommands.SCARD_INT, getName()); } @Override diff --git a/redisson/src/test/java/org/redisson/CommandHandlersTest.java b/redisson/src/test/java/org/redisson/CommandHandlersTest.java index 98099ac94..111d1f890 100644 --- a/redisson/src/test/java/org/redisson/CommandHandlersTest.java +++ b/redisson/src/test/java/org/redisson/CommandHandlersTest.java @@ -7,7 +7,7 @@ import org.redisson.config.Config; public class CommandHandlersTest extends BaseTest { - @Test(expected = RedisException.class) + @Test(expected = RuntimeException.class) public void testEncoder() throws InterruptedException { Config config = createConfig(); config.setCodec(new ErrorsCodec()); @@ -17,7 +17,7 @@ public class CommandHandlersTest extends BaseTest { redisson.getBucket("1234").set("1234"); } - @Test(expected = RedisException.class) + @Test(expected = RuntimeException.class) public void testDecoder() { redisson.getBucket("1234").set("1234"); From 629f1ec503f95aa2393882681152cbd8a6eadcdf Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 11 Jul 2017 15:00:21 +0300 Subject: [PATCH 2/4] invoke stopThreads method on any error during Replicated or Sentinel connection start --- .../org/redisson/connection/ReplicatedConnectionManager.java | 2 ++ .../org/redisson/connection/SentinelConnectionManager.java | 5 +++++ 2 files changed, 7 insertions(+) diff --git a/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java b/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java index 792c01484..329ff31fa 100644 --- a/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java @@ -84,6 +84,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager { Role role = Role.valueOf(connection.sync(RedisCommands.INFO_REPLICATION).get(ROLE_KEY)); if (Role.master.equals(role)) { if (currentMaster.get() != null) { + stopThreads(); throw new RedisException("Multiple masters detected"); } currentMaster.set(addr); @@ -96,6 +97,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager { } if (currentMaster.get() == null) { + stopThreads(); throw new RedisConnectionException("Can't connect to servers!"); } diff --git a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java index bd13f9462..4e9c3588c 100755 --- a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -71,6 +71,10 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { this.config = create(cfg); initTimer(this.config); + if (cfg.getMasterName() == null) { + throw new IllegalArgumentException("masterName parameter is not defined!"); + } + for (URI addr : cfg.getSentinelAddresses()) { RedisClient client = createClient(NodeType.SENTINEL, addr, this.config.getConnectTimeout(), this.config.getRetryInterval() * this.config.getRetryAttempts()); try { @@ -119,6 +123,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } if (currentMaster.get() == null) { + stopThreads(); throw new RedisConnectionException("Can't connect to servers!"); } init(this.config); From b2b723d6f5ceecc49b47f5fe9e62554b3c8d329c Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 11 Jul 2017 15:02:33 +0300 Subject: [PATCH 3/4] new decoder added --- .../decoder/MapCacheGetAllDecoder.java | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 redisson/src/main/java/org/redisson/connection/decoder/MapCacheGetAllDecoder.java diff --git a/redisson/src/main/java/org/redisson/connection/decoder/MapCacheGetAllDecoder.java b/redisson/src/main/java/org/redisson/connection/decoder/MapCacheGetAllDecoder.java new file mode 100644 index 000000000..8bf4185c4 --- /dev/null +++ b/redisson/src/main/java/org/redisson/connection/decoder/MapCacheGetAllDecoder.java @@ -0,0 +1,83 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.connection.decoder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.redisson.client.codec.LongCodec; +import org.redisson.client.handler.State; +import org.redisson.client.protocol.decoder.MultiDecoder; + +import io.netty.buffer.ByteBuf; + +/** + * + * @author Nikita Koksharov + * + */ +public class MapCacheGetAllDecoder implements MultiDecoder> { + + private final int shiftIndex; + private final List args; + private final boolean allowNulls; + + public MapCacheGetAllDecoder(List args, int shiftIndex) { + this(args, shiftIndex, false); + } + + public MapCacheGetAllDecoder(List args, int shiftIndex, boolean allowNulls) { + this.args = args; + this.shiftIndex = shiftIndex; + this.allowNulls = allowNulls; + } + + @Override + public Object decode(ByteBuf buf, State state) throws IOException { + return LongCodec.INSTANCE.getValueDecoder().decode(buf, state); + } + + @Override + public boolean isApplicable(int paramNum, State state) { + return false; + } + + @Override + public List decode(List parts, State state) { + if (parts.isEmpty()) { + return Collections.emptyList(); + } + + List result = new ArrayList(parts.size()*5); + for (int index = 0; index < parts.size(); index += 4) { + Object value = parts.get(index); + if (!allowNulls && value == null) { + continue; + } + + Object key = args.get(index/4+shiftIndex); + result.add(key); + result.add(value); + result.add(parts.get(index+1)); + result.add(parts.get(index+2)); + result.add(parts.get(index+3)); + } + return result; + } + +} From 47f8f9a4834791aa87b6f6a971520a8e7c15f289 Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 11 Jul 2017 15:27:04 +0300 Subject: [PATCH 4/4] netty-transport-native-epoll version updated --- redisson-all/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redisson-all/pom.xml b/redisson-all/pom.xml index 3250ba03d..9a3b86038 100644 --- a/redisson-all/pom.xml +++ b/redisson-all/pom.xml @@ -87,7 +87,7 @@ io.netty netty-transport-native-epoll linux-x86_64 - 4.1.11.Final + 4.1.12.Final com.esotericsoftware