Merge branch 'master' into 3.0.0

pull/985/head
Nikita 8 years ago
commit 04bcc969c9

@ -87,7 +87,7 @@
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<classifier>linux-x86_64</classifier>
<version>4.1.11.Final</version>
<version>4.1.12.Final</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>

@ -143,7 +143,7 @@ public interface RedisCommands {
RedisCommand<List<Object>> EXEC = new RedisCommand<List<Object>>("EXEC", new ObjectListReplayDecoder<Object>());
RedisCommand<Boolean> SADD_BOOL = new RedisCommand<Boolean>("SADD", new BooleanAmountReplayConvertor(), 2, ValueType.OBJECTS);
RedisStrictCommand<Long> SADD = new RedisStrictCommand<Long>("SADD", 2, ValueType.OBJECTS);
RedisCommand<Integer> SADD = new RedisCommand<Integer>("SADD", new IntegerReplayConvertor(), 2, ValueType.OBJECTS);
RedisCommand<Set<Object>> SPOP = new RedisCommand<Set<Object>>("SPOP", new ObjectSetReplayDecoder<Object>());
RedisCommand<Object> SPOP_SINGLE = new RedisCommand<Object>("SPOP");
RedisCommand<Boolean> SADD_SINGLE = new RedisCommand<Boolean>("SADD", new BooleanReplayConvertor(), 2);
@ -200,12 +200,12 @@ public interface RedisCommands {
RedisCommand<Integer> SORT_TO = new RedisCommand<Integer>("SORT", new IntegerReplayConvertor());
RedisStrictCommand<Long> RPOP = new RedisStrictCommand<Long>("RPOP");
RedisStrictCommand<Long> LPUSH = new RedisStrictCommand<Long>("LPUSH", 2, ValueType.OBJECTS);
RedisCommand<Integer> LPUSH = new RedisCommand<Integer>("LPUSH", new IntegerReplayConvertor(), 2, ValueType.OBJECTS);
RedisCommand<Boolean> LPUSH_BOOLEAN = new RedisCommand<Boolean>("LPUSH", new TrueReplayConvertor(), 2, ValueType.OBJECTS);
RedisStrictCommand<Void> LPUSH_VOID = new RedisStrictCommand<Void>("LPUSH", new VoidReplayConvertor(), 2);
RedisCommand<List<Object>> LRANGE = new RedisCommand<List<Object>>("LRANGE", new ObjectListReplayDecoder<Object>());
RedisCommand<Set<Object>> LRANGE_SET = new RedisCommand<Set<Object>>("LRANGE", new ObjectSetReplayDecoder<Object>());
RedisCommand<Long> RPUSH = new RedisCommand<Long>("RPUSH", 2, ValueType.OBJECTS);
RedisCommand<Integer> RPUSH = new RedisCommand<Integer>("RPUSH", new IntegerReplayConvertor(), 2, ValueType.OBJECTS);
RedisCommand<Boolean> RPUSH_BOOLEAN = new RedisCommand<Boolean>("RPUSH", new TrueReplayConvertor(), 2, ValueType.OBJECTS);
RedisCommand<Void> RPUSH_VOID = new RedisCommand<Void>("RPUSH", new VoidReplayConvertor(), 2, ValueType.OBJECTS);

@ -84,6 +84,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
Role role = Role.valueOf(connection.sync(RedisCommands.INFO_REPLICATION).get(ROLE_KEY));
if (Role.master.equals(role)) {
if (currentMaster.get() != null) {
stopThreads();
throw new RedisException("Multiple masters detected");
}
currentMaster.set(addr);
@ -96,6 +97,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
}
if (currentMaster.get() == null) {
stopThreads();
throw new RedisConnectionException("Can't connect to servers!");
}

@ -71,6 +71,10 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
this.config = create(cfg);
initTimer(this.config);
if (cfg.getMasterName() == null) {
throw new IllegalArgumentException("masterName parameter is not defined!");
}
for (URI addr : cfg.getSentinelAddresses()) {
RedisClient client = createClient(NodeType.SENTINEL, addr, this.config.getConnectTimeout(), this.config.getRetryInterval() * this.config.getRetryAttempts());
try {
@ -119,6 +123,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
if (currentMaster.get() == null) {
stopThreads();
throw new RedisConnectionException("Can't connect to servers!");
}
init(this.config);

@ -0,0 +1,83 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.connection.decoder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.decoder.MultiDecoder;
import io.netty.buffer.ByteBuf;
/**
*
* @author Nikita Koksharov
*
*/
public class MapCacheGetAllDecoder implements MultiDecoder<List<Object>> {
private final int shiftIndex;
private final List<Object> args;
private final boolean allowNulls;
public MapCacheGetAllDecoder(List<Object> args, int shiftIndex) {
this(args, shiftIndex, false);
}
public MapCacheGetAllDecoder(List<Object> args, int shiftIndex, boolean allowNulls) {
this.args = args;
this.shiftIndex = shiftIndex;
this.allowNulls = allowNulls;
}
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
return LongCodec.INSTANCE.getValueDecoder().decode(buf, state);
}
@Override
public boolean isApplicable(int paramNum, State state) {
return false;
}
@Override
public List<Object> decode(List<Object> parts, State state) {
if (parts.isEmpty()) {
return Collections.emptyList();
}
List<Object> result = new ArrayList<Object>(parts.size()*5);
for (int index = 0; index < parts.size(); index += 4) {
Object value = parts.get(index);
if (!allowNulls && value == null) {
continue;
}
Object key = args.get(index/4+shiftIndex);
result.add(key);
result.add(value);
result.add(parts.get(index+1));
result.add(parts.get(index+2));
result.add(parts.get(index+3));
}
return result;
}
}

@ -35,6 +35,7 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.IntegerReplayConvertor;
import org.redisson.client.protocol.convertor.LongReplayConvertor;
import org.redisson.command.CommandReactiveExecutor;
@ -68,7 +69,7 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
@Override
public Publisher<Integer> size() {
return commandExecutor.readReactive(getName(), codec, LLEN, getName());
return commandExecutor.readReactive(getName(), codec, RedisCommands.LLEN_INT, getName());
}
@Override
@ -220,7 +221,7 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
List<Object> args = new ArrayList<Object>(coll.size() + 1);
args.add(index);
args.addAll(coll);
return commandExecutor.evalWriteReactive(getName(), codec, new RedisCommand<Long>("EVAL", new LongReplayConvertor(), 5, ValueType.OBJECTS),
return commandExecutor.evalWriteReactive(getName(), codec, new RedisCommand<Integer>("EVAL", new IntegerReplayConvertor(), 5, ValueType.OBJECTS),
"local ind = table.remove(ARGV, 1); " + // index is the first parameter
"local size = redis.call('llen', KEYS[1]); " +
"assert(tonumber(ind) <= size, 'index: ' .. ind .. ' but current size: ' .. size); " +
@ -332,8 +333,8 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
}
}).count().next().poll();
boolean res = count.equals(Streams.wrap(size()).next().poll());
res &= count.equals(Streams.wrap(((RedissonListReactive<Object>) o).size()).next().poll());
boolean res = count.intValue() == Streams.wrap(size()).next().poll();
res &= count.intValue() == Streams.wrap(((RedissonListReactive<Object>) o).size()).next().poll();
return res;
}

@ -71,7 +71,7 @@ public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive imple
@Override
public Publisher<Integer> size() {
return commandExecutor.readReactive(getName(), codec, RedisCommands.ZCARD, getName());
return commandExecutor.readReactive(getName(), codec, RedisCommands.ZCARD_INT, getName());
}
@Override
@ -116,7 +116,7 @@ public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive imple
@Override
public Publisher<Integer> add(V value) {
long timeoutDate = 92233720368547758L;
return commandExecutor.evalWriteReactive(getName(), codec, RedisCommands.EVAL_LONG,
return commandExecutor.evalWriteReactive(getName(), codec, RedisCommands.EVAL_INTEGER,
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[3]); "
+ "if expireDateScore ~= false and tonumber(expireDateScore) > tonumber(ARGV[1]) then "
+ "return 0;"

@ -63,7 +63,7 @@ public class RedissonSetReactive<V> extends RedissonExpirableReactive implements
@Override
public Publisher<Integer> size() {
return commandExecutor.readReactive(getName(), codec, RedisCommands.SCARD, getName());
return commandExecutor.readReactive(getName(), codec, RedisCommands.SCARD_INT, getName());
}
@Override

@ -7,7 +7,7 @@ import org.redisson.config.Config;
public class CommandHandlersTest extends BaseTest {
@Test(expected = RedisException.class)
@Test(expected = RuntimeException.class)
public void testEncoder() throws InterruptedException {
Config config = createConfig();
config.setCodec(new ErrorsCodec());
@ -17,7 +17,7 @@ public class CommandHandlersTest extends BaseTest {
redisson.getBucket("1234").set("1234");
}
@Test(expected = RedisException.class)
@Test(expected = RuntimeException.class)
public void testDecoder() {
redisson.getBucket("1234").set("1234");

Loading…
Cancel
Save