Merge branch 'mrniko/master'

pull/427/head
jackygurui 9 years ago
commit 54aac5abfe

@ -15,7 +15,6 @@
*/
package org.redisson;
class BaseConfig<T extends BaseConfig<T>> {
/**
@ -113,6 +112,7 @@ class BaseConfig<T extends BaseConfig<T>> {
this.subscriptionsPerConnection = subscriptionsPerConnection;
return (T) this;
}
public int getSubscriptionsPerConnection() {
return subscriptionsPerConnection;
}
@ -127,6 +127,7 @@ class BaseConfig<T extends BaseConfig<T>> {
this.password = password;
return (T) this;
}
public String getPassword() {
return password;
}
@ -142,6 +143,7 @@ class BaseConfig<T extends BaseConfig<T>> {
this.retryAttempts = retryAttempts;
return (T) this;
}
public int getRetryAttempts() {
return retryAttempts;
}
@ -156,6 +158,7 @@ class BaseConfig<T extends BaseConfig<T>> {
this.retryInterval = retryInterval;
return (T) this;
}
public int getRetryInterval() {
return retryInterval;
}
@ -169,6 +172,7 @@ class BaseConfig<T extends BaseConfig<T>> {
this.timeout = timeout;
return (T) this;
}
public int getTimeout() {
return timeout;
}
@ -183,11 +187,11 @@ class BaseConfig<T extends BaseConfig<T>> {
this.clientName = clientName;
return (T) this;
}
public String getClientName() {
return clientName;
}
/**
* Ping timeout used in <code>Node.ping</code> and <code>Node.pingAll<code> operation
*
@ -197,13 +201,14 @@ class BaseConfig<T extends BaseConfig<T>> {
this.pingTimeout = pingTimeout;
return (T) this;
}
public int getPingTimeout() {
return pingTimeout;
}
/**
* Timeout during connecting to any Redis server.
*
* <p/>
* @param connectTimeout - timeout in milliseconds
* @return
*/
@ -211,6 +216,7 @@ class BaseConfig<T extends BaseConfig<T>> {
this.connectTimeout = connectTimeout;
return (T) this;
}
public int getConnectTimeout() {
return connectTimeout;
}
@ -227,6 +233,7 @@ class BaseConfig<T extends BaseConfig<T>> {
this.idleConnectionTimeout = idleConnectionTimeout;
return (T) this;
}
public int getIdleConnectionTimeout() {
return idleConnectionTimeout;
}
@ -234,10 +241,10 @@ class BaseConfig<T extends BaseConfig<T>> {
/**
* Reconnection attempt timeout to Redis server when
* it has been excluded from internal list of available servers.
*
* <p/>
* On every such timeout event Redisson tries
* to connect to disconnected Redis server.
*
* <p/>
* Default is 3000
*
* @see #failedAttempts
@ -246,8 +253,9 @@ class BaseConfig<T extends BaseConfig<T>> {
public T setReconnectionTimeout(int slaveRetryTimeout) {
this.reconnectionTimeout = slaveRetryTimeout;
return (T)this;
return (T) this;
}
public int getReconnectionTimeout() {
return reconnectionTimeout;
}
@ -256,14 +264,15 @@ class BaseConfig<T extends BaseConfig<T>> {
* Redis server will be excluded from the internal list of available nodes
* when sequential unsuccessful execution attempts of any Redis command
* on this server reaches <code>failedAttempts</code>.
*
* <p/>
* Default is 3
*
*/
public T setFailedAttempts(int slaveFailedAttempts) {
this.failedAttempts = slaveFailedAttempts;
return (T)this;
return (T) this;
}
public int getFailedAttempts() {
return failedAttempts;
}

@ -79,10 +79,10 @@ public class Config {
if (oldConf.getMasterSlaveServersConfig() != null) {
setMasterSlaveServersConfig(new MasterSlaveServersConfig(oldConf.getMasterSlaveServersConfig()));
}
if (oldConf.getSentinelServersConfig() != null ) {
if (oldConf.getSentinelServersConfig() != null) {
setSentinelServersConfig(new SentinelServersConfig(oldConf.getSentinelServersConfig()));
}
if (oldConf.getClusterServersConfig() != null ) {
if (oldConf.getClusterServersConfig() != null) {
setClusterServersConfig(new ClusterServersConfig(oldConf.getClusterServersConfig()));
}
if (oldConf.getElasticacheServersConfig() != null) {
@ -100,6 +100,7 @@ public class Config {
this.codec = codec;
return this;
}
public Codec getCodec() {
return codec;
}
@ -131,10 +132,10 @@ public class Config {
return clusterServersConfig;
}
ClusterServersConfig getClusterServersConfig() {
return clusterServersConfig;
}
void setClusterServersConfig(ClusterServersConfig clusterServersConfig) {
this.clusterServersConfig = clusterServersConfig;
}
@ -169,6 +170,7 @@ public class Config {
ElasticacheServersConfig getElasticacheServersConfig() {
return elasticacheServersConfig;
}
void setElasticacheServersConfig(ElasticacheServersConfig elasticacheServersConfig) {
this.elasticacheServersConfig = elasticacheServersConfig;
}
@ -200,10 +202,10 @@ public class Config {
return singleServerConfig;
}
SingleServerConfig getSingleServerConfig() {
return singleServerConfig;
}
void setSingleServerConfig(SingleServerConfig singleConnectionConfig) {
this.singleServerConfig = singleConnectionConfig;
}
@ -237,6 +239,7 @@ public class Config {
SentinelServersConfig getSentinelServersConfig() {
return sentinelServersConfig;
}
void setSentinelServersConfig(SentinelServersConfig sentinelConnectionConfig) {
this.sentinelServersConfig = sentinelConnectionConfig;
}
@ -270,6 +273,7 @@ public class Config {
MasterSlaveServersConfig getMasterSlaveServersConfig() {
return masterSlaveServersConfig;
}
void setMasterSlaveServersConfig(MasterSlaveServersConfig masterSlaveConnectionConfig) {
this.masterSlaveServersConfig = masterSlaveConnectionConfig;
}
@ -339,6 +343,7 @@ public class Config {
this.useLinuxNativeEpoll = useLinuxNativeEpoll;
return this;
}
public boolean isUseLinuxNativeEpoll() {
return useLinuxNativeEpoll;
}
@ -360,6 +365,7 @@ public class Config {
this.eventLoopGroup = eventLoopGroup;
return this;
}
public EventLoopGroup getEventLoopGroup() {
return eventLoopGroup;
}

@ -282,7 +282,7 @@ public class RedissonKeys implements RKeys {
}
@Override
public Long count() {
public long count() {
return commandExecutor.get(countAsync());
}

@ -56,11 +56,11 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
public static final RedisCommand<Boolean> EVAL_BOOLEAN_ARGS2 = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 5, ValueType.OBJECTS);
protected RedissonList(CommandAsyncExecutor commandExecutor, String name) {
public RedissonList(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
}
protected RedissonList(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
public RedissonList(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
}

@ -35,9 +35,9 @@ public interface RListReactive<V> extends RCollectionReactive<V> {
Publisher<V> iterator(int startIndex);
Publisher<Integer> lastIndexOf(Object o);
Publisher<Long> lastIndexOf(Object o);
Publisher<Integer> indexOf(Object o);
Publisher<Long> indexOf(Object o);
Publisher<Long> add(long index, V element);
@ -49,6 +49,6 @@ public interface RListReactive<V> extends RCollectionReactive<V> {
Publisher<V> get(long index);
Publisher<V> remove(int index);
Publisher<V> remove(long index);
}

@ -66,9 +66,11 @@ public class CommandEncoder extends MessageToByteEncoder<CommandData<Object, Obj
for (Object param : msg.getParams()) {
Encoder encoder = paramsEncoder;
if (msg.getCommand().getInParamType().size() == 1) {
if (msg.getCommand().getInParamIndex() == i && msg.getCommand().getInParamType().get(0) == ValueType.OBJECT) {
if (msg.getCommand().getInParamIndex() == i
&& msg.getCommand().getInParamType().get(0) == ValueType.OBJECT) {
encoder = msg.getCodec().getValueEncoder();
} else if (msg.getCommand().getInParamIndex() <= i && msg.getCommand().getInParamType().get(0) != ValueType.OBJECT) {
} else if (msg.getCommand().getInParamIndex() <= i
&& msg.getCommand().getInParamType().get(0) != ValueType.OBJECT) {
encoder = selectEncoder(msg, i - msg.getCommand().getInParamIndex());
}
} else {
@ -124,31 +126,27 @@ public class CommandEncoder extends MessageToByteEncoder<CommandData<Object, Obj
out.writeBytes(CRLF);
}
final static char[] DigitTens = {'0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '1', '1', '1', '1',
'1', '1', '1', '1', '1', '1', '2', '2', '2', '2', '2', '2', '2', '2', '2', '2', '3', '3', '3',
'3', '3', '3', '3', '3', '3', '3', '4', '4', '4', '4', '4', '4', '4', '4', '4', '4', '5', '5',
'5', '5', '5', '5', '5', '5', '5', '5', '6', '6', '6', '6', '6', '6', '6', '6', '6', '6', '7',
'7', '7', '7', '7', '7', '7', '7', '7', '7', '8', '8', '8', '8', '8', '8', '8', '8', '8', '8',
'9', '9', '9', '9', '9', '9', '9', '9', '9', '9',};
static final char[] DIGITTENS = { '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '1', '1', '1', '1', '1', '1',
'1', '1', '1', '1', '2', '2', '2', '2', '2', '2', '2', '2', '2', '2', '3', '3', '3', '3', '3', '3', '3',
'3', '3', '3', '4', '4', '4', '4', '4', '4', '4', '4', '4', '4', '5', '5', '5', '5', '5', '5', '5', '5',
'5', '5', '6', '6', '6', '6', '6', '6', '6', '6', '6', '6', '7', '7', '7', '7', '7', '7', '7', '7', '7',
'7', '8', '8', '8', '8', '8', '8', '8', '8', '8', '8', '9', '9', '9', '9', '9', '9', '9', '9', '9', '9', };
final static char[] DigitOnes = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3',
'4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2',
'3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1',
'2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0',
'1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9',};
static final char[] DIGITONES = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5',
'6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6',
'7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7',
'8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8',
'9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', };
final static char[] digits = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e',
'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x',
'y', 'z'};
static final char[] DIGITS = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g',
'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z' };
final static int[] sizeTable = {9, 99, 999, 9999, 99999, 999999, 9999999, 99999999, 999999999,
Integer.MAX_VALUE};
static final int[] SIZETABLE = { 9, 99, 999, 9999, 99999, 999999, 9999999, 99999999, 999999999, Integer.MAX_VALUE };
// Requires positive x
static int stringSize(long x) {
for (int i = 0;; i++)
if (x <= sizeTable[i])
if (x <= SIZETABLE[i])
return i + 1;
}
@ -168,8 +166,8 @@ public class CommandEncoder extends MessageToByteEncoder<CommandData<Object, Obj
// really: r = i - (q * 100);
r = i - ((q << 6) + (q << 5) + (q << 2));
i = q;
buf[--charPos] = (byte) DigitOnes[(int)r];
buf[--charPos] = (byte) DigitTens[(int)r];
buf[--charPos] = (byte) DIGITONES[(int) r];
buf[--charPos] = (byte) DIGITTENS[(int) r];
}
// Fall thru to fast mode for smaller numbers
@ -177,7 +175,7 @@ public class CommandEncoder extends MessageToByteEncoder<CommandData<Object, Obj
for (;;) {
q = (i * 52429) >>> (16 + 3);
r = i - ((q << 3) + (q << 1)); // r = i-(q*10) ...
buf[--charPos] = (byte) digits[(int)r];
buf[--charPos] = (byte) DIGITS[(int) r];
i = q;
if (i == 0)
break;
@ -194,5 +192,4 @@ public class CommandEncoder extends MessageToByteEncoder<CommandData<Object, Obj
return buf;
}
}

@ -24,8 +24,8 @@ import com.fasterxml.jackson.dataformat.cbor.CBORFactory;
* @date 2015-10-16
*/
public class CborJacksonCodec extends JsonJacksonCodec {
@Override
protected ObjectMapper initObjectMapper() {
return new ObjectMapper(new CBORFactory());
}
@Override
protected ObjectMapper initObjectMapper() {
return new ObjectMapper(new CBORFactory());
}
}

@ -73,8 +73,7 @@ public class JsonJacksonCodec implements Codec {
init(mapObjectMapper);
// type info inclusion
TypeResolverBuilder<?> mapTyper = new DefaultTypeResolverBuilder(DefaultTyping.NON_FINAL) {
public boolean useForType(JavaType t)
{
public boolean useForType(JavaType t) {
switch (_appliesFor) {
case NON_CONCRETE_AND_ARRAYS:
while (t.isArrayType()) {
@ -93,7 +92,7 @@ public class JsonJacksonCodec implements Codec {
}
return !t.isFinal(); // includes Object.class
default:
//case JAVA_LANG_OBJECT:
// case JAVA_LANG_OBJECT:
return (t.getRawClass() == Object.class);
}
}
@ -106,10 +105,9 @@ public class JsonJacksonCodec implements Codec {
protected void init(ObjectMapper objectMapper) {
objectMapper.setSerializationInclusion(Include.NON_NULL);
objectMapper.setVisibilityChecker(objectMapper.getSerializationConfig().getDefaultVisibilityChecker()
.withFieldVisibility(JsonAutoDetect.Visibility.ANY)
.withGetterVisibility(JsonAutoDetect.Visibility.NONE)
.withSetterVisibility(JsonAutoDetect.Visibility.NONE)
.withCreatorVisibility(JsonAutoDetect.Visibility.NONE));
.withFieldVisibility(JsonAutoDetect.Visibility.ANY).withGetterVisibility(JsonAutoDetect.Visibility.NONE)
.withSetterVisibility(JsonAutoDetect.Visibility.NONE)
.withCreatorVisibility(JsonAutoDetect.Visibility.NONE));
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
objectMapper.configure(SerializationFeature.WRITE_BIGDECIMAL_AS_PLAIN, true);
objectMapper.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true);

@ -95,7 +95,7 @@ public interface RKeys extends RKeysAsync {
*
* @return
*/
Long count();
long count();
/**
* Delete all keys of currently selected database

@ -35,8 +35,7 @@ import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;
/**
* Guarantees multiple locks operation handling (lock, tryLock...)
* in atomic way without deadlocks.
* Groups multiple independent locks and handles them as one lock.
*
* @author Nikita Koksharov
*

@ -39,7 +39,7 @@ public class RedissonKeysReactive implements RKeysReactive {
private final CommandReactiveService commandExecutor;
RedissonKeys instance;
private final RedissonKeys instance;
public RedissonKeysReactive(CommandReactiveService commandExecutor) {
super();

@ -30,14 +30,13 @@ import java.util.List;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.RedissonList;
import org.redisson.api.RListReactive;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.BooleanNumberReplayConvertor;
import org.redisson.client.protocol.convertor.Convertor;
import org.redisson.client.protocol.convertor.IntegerReplayConvertor;
import org.redisson.client.protocol.convertor.LongReplayConvertor;
import org.redisson.command.CommandReactiveExecutor;
@ -56,12 +55,16 @@ import reactor.rx.subscription.ReactiveSubscription;
*/
public class RedissonListReactive<V> extends RedissonExpirableReactive implements RListReactive<V> {
private final RedissonList<V> instance;
public RedissonListReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
instance = new RedissonList<V>(commandExecutor, name);
}
public RedissonListReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
instance = new RedissonList<V>(codec, commandExecutor, name);
}
@Override
@ -151,7 +154,7 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
@Override
public Publisher<Boolean> remove(Object o) {
return remove(o, 1);
return reactive(instance.removeAsync(o));
}
protected Publisher<Boolean> remove(Object o, int count) {
@ -160,17 +163,7 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
@Override
public Publisher<Boolean> containsAll(Collection<?> c) {
return commandExecutor.evalReadReactive(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES,
"local items = redis.call('lrange', KEYS[1], 0, -1) " +
"for i=1, #items do " +
"for j = 0, table.getn(ARGV), 1 do " +
"if items[i] == ARGV[j] then " +
"table.remove(ARGV, j) " +
"end " +
"end " +
"end " +
"return table.getn(ARGV) == 0 and 1 or 0",
Collections.<Object>singletonList(getName()), c.toArray());
return reactive(instance.containsAllAsync(c));
}
@Override
@ -232,40 +225,12 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
@Override
public Publisher<Boolean> removeAll(Collection<?> c) {
return commandExecutor.evalWriteReactive(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES,
"local v = 0 " +
"for i = 0, table.getn(ARGV), 1 do "
+ "if redis.call('lrem', KEYS[1], 0, ARGV[i]) == 1 "
+ "then v = 1 end "
+"end "
+ "return v ",
Collections.<Object>singletonList(getName()), c.toArray());
return reactive(instance.removeAllAsync(c));
}
@Override
public Publisher<Boolean> retainAll(Collection<?> c) {
return commandExecutor.evalWriteReactive(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES,
"local changed = 0 " +
"local items = redis.call('lrange', KEYS[1], 0, -1) "
+ "local i = 1 "
+ "local s = table.getn(items) "
+ "while i <= s do "
+ "local element = items[i] "
+ "local isInAgrs = false "
+ "for j = 0, table.getn(ARGV), 1 do "
+ "if ARGV[j] == element then "
+ "isInAgrs = true "
+ "break "
+ "end "
+ "end "
+ "if isInAgrs == false then "
+ "redis.call('LREM', KEYS[1], 0, element) "
+ "changed = 1 "
+ "end "
+ "i = i + 1 "
+ "end "
+ "return changed ",
Collections.<Object>singletonList(getName()), c.toArray());
return reactive(instance.retainAllAsync(c));
}
@Override
@ -293,7 +258,7 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
}
@Override
public Publisher<V> remove(int index) {
public Publisher<V> remove(long index) {
if (index == 0) {
return commandExecutor.writeReactive(getName(), codec, LPOP, getName());
}
@ -309,7 +274,7 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
@Override
public Publisher<Boolean> contains(Object o) {
return indexOf(o, new BooleanNumberReplayConvertor(-1L));
return reactive(instance.containsAsync(o));
}
private <R> Publisher<R> indexOf(Object o, Convertor<R> convertor) {
@ -327,13 +292,13 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
}
@Override
public Publisher<Integer> indexOf(Object o) {
return indexOf(o, new IntegerReplayConvertor());
public Publisher<Long> indexOf(Object o) {
return indexOf(o, new LongReplayConvertor());
}
@Override
public Publisher<Integer> lastIndexOf(Object o) {
return commandExecutor.evalReadReactive(getName(), codec, new RedisCommand<Integer>("EVAL", new IntegerReplayConvertor(), 4),
public Publisher<Long> lastIndexOf(Object o) {
return commandExecutor.evalReadReactive(getName(), codec, new RedisCommand<Integer>("EVAL", 4),
"local key = KEYS[1] " +
"local obj = ARGV[1] " +
"local items = redis.call('lrange', key, 0, -1) " +

@ -229,7 +229,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest {
sync(list.add(0));
sync(list.add(10));
int index = sync(list.lastIndexOf(3));
long index = sync(list.lastIndexOf(3));
Assert.assertEquals(2, index);
}
@ -247,7 +247,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest {
sync(list.add(0));
sync(list.add(10));
int index = sync(list.lastIndexOf(3));
long index = sync(list.lastIndexOf(3));
Assert.assertEquals(5, index);
}
@ -265,7 +265,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest {
sync(list.add(3));
sync(list.add(10));
int index = sync(list.lastIndexOf(3));
long index = sync(list.lastIndexOf(3));
Assert.assertEquals(8, index);
}

Loading…
Cancel
Save