Merge branch 'master' into 3.0.0

# Conflicts:
#	redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java
pull/1303/head
Nikita 7 years ago
commit a66cf86fc1

@ -60,7 +60,8 @@ public class RedissonAtomicDouble extends RedissonExpirable implements RAtomicDo
@Override @Override
public RFuture<Boolean> compareAndSetAsync(double expect, double update) { public RFuture<Boolean> compareAndSetAsync(double expect, double update) {
return commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, return commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if tonumber(redis.call('get', KEYS[1])) == tonumber(ARGV[1]) then " "local value = redis.call('get', KEYS[1]);"
+ "if (value == false and tonumber(ARGV[1]) == 0) or (tonumber(value) == tonumber(ARGV[1])) then "
+ "redis.call('set', KEYS[1], ARGV[2]); " + "redis.call('set', KEYS[1], ARGV[2]); "
+ "return 1 " + "return 1 "
+ "else " + "else "
@ -80,12 +81,26 @@ public class RedissonAtomicDouble extends RedissonExpirable implements RAtomicDo
@Override @Override
public double get() { public double get() {
return addAndGet(0); return get(getAsync());
}
@Override
public double getAndDelete() {
return get(getAndDeleteAsync());
}
@Override
public RFuture<Double> getAndDeleteAsync() {
return commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_DOUBLE,
"local currValue = redis.call('get', KEYS[1]); "
+ "redis.call('del', KEYS[1]); "
+ "return currValue; ",
Collections.<Object>singletonList(getName()));
} }
@Override @Override
public RFuture<Double> getAsync() { public RFuture<Double> getAsync() {
return addAndGetAsync(0); return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.GET_DOUBLE, getName());
} }
@Override @Override

@ -67,6 +67,20 @@ public class RedissonAtomicLong extends RedissonExpirable implements RAtomicLong
Collections.<Object>singletonList(getName()), expect, update); Collections.<Object>singletonList(getName()), expect, update);
} }
@Override
public long getAndDelete() {
return get(getAndDeleteAsync());
}
@Override
public RFuture<Long> getAndDeleteAsync() {
return commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_LONG_SAFE,
"local currValue = redis.call('get', KEYS[1]); "
+ "redis.call('del', KEYS[1]); "
+ "return currValue; ",
Collections.<Object>singletonList(getName()));
}
@Override @Override
public long decrementAndGet() { public long decrementAndGet() {
return get(decrementAndGetAsync()); return get(decrementAndGetAsync());
@ -79,12 +93,12 @@ public class RedissonAtomicLong extends RedissonExpirable implements RAtomicLong
@Override @Override
public long get() { public long get() {
return addAndGet(0); return get(getAsync());
} }
@Override @Override
public RFuture<Long> getAsync() { public RFuture<Long> getAsync() {
return addAndGetAsync(0); return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.GET_LONG, getName());
} }
@Override @Override

@ -15,17 +15,13 @@
*/ */
package org.redisson; package org.redisson;
import java.util.Arrays;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RSemaphore; import org.redisson.api.RSemaphore;
import org.redisson.api.RTopic; import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener; import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.LongAdder;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise; import org.redisson.misc.RedissonPromise;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -109,20 +105,17 @@ public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpira
public RFuture<T> sumAsync() { public RFuture<T> sumAsync() {
final RPromise<T> result = new RedissonPromise<T>(); final RPromise<T> result = new RedissonPromise<T>();
RFuture<Integer> future = commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_INTEGER, RFuture<Long> future = topic.publishAsync(SUM_MSG);
"redis.call('del', KEYS[1]); " future.addListener(new FutureListener<Long>() {
+ "return redis.call('publish', KEYS[2], ARGV[1]); ",
Arrays.<Object>asList(getName(), topic.getChannelNames().get(0)), SUM_MSG);
future.addListener(new FutureListener<Integer>() {
@Override @Override
public void operationComplete(Future<Integer> future) throws Exception { public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
result.tryFailure(future.cause()); result.tryFailure(future.cause());
return; return;
} }
semaphore.acquireAsync(future.getNow()).addListener(new FutureListener<Void>() { semaphore.acquireAsync(future.getNow().intValue()).addListener(new FutureListener<Void>() {
@Override @Override
public void operationComplete(Future<Void> future) throws Exception { public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
@ -130,7 +123,7 @@ public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpira
return; return;
} }
RFuture<T> valueFuture = getAsync(); RFuture<T> valueFuture = getAndDeleteAsync();
valueFuture.addListener(new FutureListener<T>() { valueFuture.addListener(new FutureListener<T>() {
@Override @Override
public void operationComplete(Future<T> future) throws Exception { public void operationComplete(Future<T> future) throws Exception {
@ -153,21 +146,22 @@ public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpira
public RFuture<Void> resetAsync() { public RFuture<Void> resetAsync() {
final RPromise<Void> result = new RedissonPromise<Void>(); final RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Integer> future = commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_INTEGER, RFuture<Long> future = topic.publishAsync(CLEAR_MSG);
"redis.call('del', KEYS[1]); " future.addListener(new FutureListener<Long>() {
+ "return redis.call('publish', KEYS[2], ARGV[1]); ",
Arrays.<Object>asList(getName(), topic.getChannelNames().get(0)), CLEAR_MSG);
future.addListener(new FutureListener<Integer>() {
@Override @Override
public void operationComplete(Future<Integer> future) throws Exception { public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
result.tryFailure(future.cause()); result.tryFailure(future.cause());
return; return;
} }
semaphore.acquireAsync(future.getNow()).addListener(new FutureListener<Void>() { int value = 0;
if (future.getNow() != null) {
value = future.getNow().intValue();
}
semaphore.acquireAsync(value).addListener(new FutureListener<Void>() {
@Override @Override
public void operationComplete(Future<Void> future) throws Exception { public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
@ -190,6 +184,6 @@ public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpira
protected abstract RFuture<T> addAndGetAsync(); protected abstract RFuture<T> addAndGetAsync();
protected abstract RFuture<T> getAsync(); protected abstract RFuture<T> getAndDeleteAsync();
} }

@ -67,6 +67,7 @@ public class RedissonBatch implements RBatch {
private int syncSlaves; private int syncSlaves;
private long syncTimeout; private long syncTimeout;
private boolean skipResult; private boolean skipResult;
private boolean atomic;
protected RedissonBatch(UUID id, EvictionScheduler evictionScheduler, ConnectionManager connectionManager) { protected RedissonBatch(UUID id, EvictionScheduler evictionScheduler, ConnectionManager connectionManager) {
this.executorService = new CommandBatchService(connectionManager); this.executorService = new CommandBatchService(connectionManager);
@ -241,6 +242,12 @@ public class RedissonBatch implements RBatch {
return this; return this;
} }
@Override
public RBatch atomic() {
this.atomic = true;
return this;
}
@Override @Override
public RBatch skipResult() { public RBatch skipResult() {
this.skipResult = true; this.skipResult = true;
@ -267,22 +274,22 @@ public class RedissonBatch implements RBatch {
@Override @Override
public BatchResult<?> execute() { public BatchResult<?> execute() {
return executorService.execute(syncSlaves, syncTimeout, skipResult, timeout, retryAttempts, retryInterval); return executorService.execute(syncSlaves, syncTimeout, skipResult, timeout, retryAttempts, retryInterval, atomic);
} }
@Override @Override
public void executeSkipResult() { public void executeSkipResult() {
executorService.execute(syncSlaves, syncTimeout, true, timeout, retryAttempts, retryInterval); executorService.execute(syncSlaves, syncTimeout, true, timeout, retryAttempts, retryInterval, atomic);
} }
@Override @Override
public RFuture<Void> executeSkipResultAsync() { public RFuture<Void> executeSkipResultAsync() {
return executorService.executeAsync(syncSlaves, syncTimeout, true, timeout, retryAttempts, retryInterval); return executorService.executeAsync(syncSlaves, syncTimeout, true, timeout, retryAttempts, retryInterval, atomic);
} }
@Override @Override
public RFuture<BatchResult<?>> executeAsync() { public RFuture<BatchResult<?>> executeAsync() {
return executorService.executeAsync(syncSlaves, syncTimeout, skipResult, timeout, retryAttempts, retryInterval); return executorService.executeAsync(syncSlaves, syncTimeout, skipResult, timeout, retryAttempts, retryInterval, atomic);
} }
@Override @Override

@ -102,6 +102,20 @@ public class RedissonBucket<V> extends RedissonExpirable implements RBucket<V> {
return commandExecutor.readAsync(getName(), codec, RedisCommands.GET, getName()); return commandExecutor.readAsync(getName(), codec, RedisCommands.GET, getName());
} }
@Override
public V getAndDelete() {
return get(getAndDeleteAsync());
}
@Override
public RFuture<V> getAndDeleteAsync() {
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_OBJECT,
"local currValue = redis.call('get', KEYS[1]); "
+ "redis.call('del', KEYS[1]); "
+ "return currValue; ",
Collections.<Object>singletonList(getName()));
}
@Override @Override
public long size() { public long size() {
return get(sizeAsync()); return get(sizeAsync());

@ -50,8 +50,8 @@ public class RedissonDoubleAdder extends RedissonBaseAdder<Double> implements RD
} }
@Override @Override
protected RFuture<Double> getAsync() { protected RFuture<Double> getAndDeleteAsync() {
return atomicDouble.getAsync(); return atomicDouble.getAndDeleteAsync();
} }
@Override @Override

@ -30,6 +30,7 @@ import org.redisson.api.RGeo;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.CodecDecoder; import org.redisson.client.protocol.decoder.CodecDecoder;
@ -91,7 +92,7 @@ public class RedissonGeo<V> extends RedissonScoredSortedSet<V> implements RGeo<V
params.add(entry.getLatitude()); params.add(entry.getLatitude());
params.add(encode(entry.getMember())); params.add(encode(entry.getMember()));
} }
return commandExecutor.writeAsync(getName(), codec, RedisCommands.GEOADD_ENTRIES, params.toArray()); return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.GEOADD_ENTRIES, params.toArray());
} }
@Override @Override
@ -101,7 +102,7 @@ public class RedissonGeo<V> extends RedissonScoredSortedSet<V> implements RGeo<V
@Override @Override
public RFuture<Double> distAsync(V firstMember, V secondMember, GeoUnit geoUnit) { public RFuture<Double> distAsync(V firstMember, V secondMember, GeoUnit geoUnit) {
return commandExecutor.readAsync(getName(), codec, RedisCommands.GEODIST, getName(), encode(firstMember), encode(secondMember), geoUnit); return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.GEODIST, getName(), encode(firstMember), encode(secondMember), geoUnit);
} }
@Override @Override
@ -117,7 +118,7 @@ public class RedissonGeo<V> extends RedissonScoredSortedSet<V> implements RGeo<V
params.add(encode(member)); params.add(encode(member));
} }
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("GEOHASH", new MapGetAllDecoder((List<Object>)Arrays.asList(members), 0)); RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("GEOHASH", new MapGetAllDecoder((List<Object>)Arrays.asList(members), 0));
return commandExecutor.readAsync(getName(), codec, command, params.toArray()); return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, command, params.toArray());
} }
@Override @Override
@ -135,7 +136,7 @@ public class RedissonGeo<V> extends RedissonScoredSortedSet<V> implements RGeo<V
MultiDecoder<Map<Object, Object>> decoder = new ListMultiDecoder(new GeoPositionDecoder(), new ObjectListReplayDecoder(ListMultiDecoder.RESET), new GeoPositionMapDecoder((List<Object>)Arrays.asList(members))); MultiDecoder<Map<Object, Object>> decoder = new ListMultiDecoder(new GeoPositionDecoder(), new ObjectListReplayDecoder(ListMultiDecoder.RESET), new GeoPositionMapDecoder((List<Object>)Arrays.asList(members)));
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("GEOPOS", decoder); RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("GEOPOS", decoder);
return commandExecutor.readAsync(getName(), codec, command, params.toArray()); return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, command, params.toArray());
} }
@Override @Override

@ -68,6 +68,7 @@ import org.redisson.command.CommandAsyncExecutor;
import org.redisson.eviction.EvictionScheduler; import org.redisson.eviction.EvictionScheduler;
import org.redisson.misc.Hash; import org.redisson.misc.Hash;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -76,7 +77,6 @@ import io.netty.buffer.Unpooled;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ThreadLocalRandom;
/** /**
* *
@ -239,6 +239,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
@Override @Override
public void operationComplete(Future<Boolean> future) throws Exception { public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
log.error("Can't check existance", future.cause());
return; return;
} }
@ -427,7 +428,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
@Override @Override
protected RFuture<V> putOperationAsync(K key, V value) { protected RFuture<V> putOperationAsync(K key, V value) {
ByteBuf mapKey = encodeMapKey(key); ByteBuf mapKey = encodeMapKey(key);
ByteBuf mapValue = encodeMapKey(value); ByteBuf mapValue = encodeMapValue(value);
CacheKey cacheKey = toCacheKey(mapKey); CacheKey cacheKey = toCacheKey(mapKey);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash()); byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
ByteBuf msg = createSyncMessage(mapKey, mapValue, cacheKey); ByteBuf msg = createSyncMessage(mapKey, mapValue, cacheKey);
@ -445,7 +446,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
+ "end;" + "end;"
+ "return v; ", + "return v; ",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()), Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
mapKey, encodeMapValue(value), msg, invalidateEntryOnChange, System.currentTimeMillis(), entryId); mapKey, mapValue, msg, invalidateEntryOnChange, System.currentTimeMillis(), entryId);
} }
protected ByteBuf createSyncMessage(ByteBuf mapKey, ByteBuf mapValue, CacheKey cacheKey) { protected ByteBuf createSyncMessage(ByteBuf mapKey, ByteBuf mapValue, CacheKey cacheKey) {
@ -953,7 +954,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
} }
} }
final RPromise<Map<K, V>> promise = newPromise(); final RPromise<Map<K, V>> promise = new RedissonPromise<Map<K, V>>();
RFuture<Map<K, V>> future = super.getAllAsync(mapKeys); RFuture<Map<K, V>> future = super.getAllAsync(mapKeys);
future.addListener(new FutureListener<Map<K, V>>() { future.addListener(new FutureListener<Map<K, V>>() {
@Override @Override
@ -984,19 +985,16 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
} }
@Override @Override
public RFuture<Void> putAllOperationAsync(final Map<? extends K, ? extends V> map) { protected RFuture<Void> putAllOperationAsync(final Map<? extends K, ? extends V> map) {
List<Object> params = new ArrayList<Object>(map.size()*3); List<Object> params = new ArrayList<Object>(map.size()*3);
params.add(invalidateEntryOnChange); params.add(invalidateEntryOnChange);
params.add(map.size()*2); params.add(map.size()*2);
byte[][] hashes = new byte[map.size()][]; byte[][] hashes = new byte[map.size()][];
int i = 0; int i = 0;
int payloadSize = 0;
for (java.util.Map.Entry<? extends K, ? extends V> t : map.entrySet()) { for (java.util.Map.Entry<? extends K, ? extends V> t : map.entrySet()) {
ByteBuf mapKey = encodeMapKey(t.getKey()); ByteBuf mapKey = encodeMapKey(t.getKey());
payloadSize += mapKey.readableBytes();
ByteBuf mapValue = encodeMapValue(t.getValue()); ByteBuf mapValue = encodeMapValue(t.getValue());
payloadSize += mapValue.readableBytes();
params.add(mapKey); params.add(mapKey);
params.add(mapValue); params.add(mapValue);
CacheKey cacheKey = toCacheKey(mapKey); CacheKey cacheKey = toCacheKey(mapKey);
@ -1004,7 +1002,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
i++; i++;
} }
ByteBuf msgEncoded; ByteBuf msgEncoded = null;
if (syncStrategy == SyncStrategy.UPDATE) { if (syncStrategy == SyncStrategy.UPDATE) {
List<LocalCachedMapUpdate.Entry> entries = new ArrayList<LocalCachedMapUpdate.Entry>(); List<LocalCachedMapUpdate.Entry> entries = new ArrayList<LocalCachedMapUpdate.Entry>();
for (int j = 2; j < params.size(); j += 2) { for (int j = 2; j < params.size(); j += 2) {
@ -1024,23 +1022,25 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
byte[] entryId = generateLogEntryId(hash); byte[] entryId = generateLogEntryId(hash);
params.add(time); params.add(time);
params.add(entryId); params.add(entryId);
payloadSize += entryId.length + 8;
} }
} }
params.add(msgEncoded); if (msgEncoded != null) {
payloadSize += msgEncoded.readableBytes(); params.add(msgEncoded);
}
log.debug("Payload size passed to putAll method: {}", payloadSize);
final RPromise<Void> result = newPromise(); final RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Void> future = commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID, RFuture<Void> future = commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
"redis.call('hmset', KEYS[1], unpack(ARGV, 3, tonumber(ARGV[2]) + 2));" "for i=3, tonumber(ARGV[2]) + 2, 5000 do "
+ "redis.call('hmset', KEYS[1], unpack(ARGV, i, math.min(i+4999, tonumber(ARGV[2]) + 2))); "
+ "end; "
+ "if ARGV[1] == '1' then " + "if ARGV[1] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[#ARGV]); " + "redis.call('publish', KEYS[2], ARGV[#ARGV]); "
+ "end;" + "end;"
+ "if ARGV[1] == '2' then " + "if ARGV[1] == '2' then "
+ "redis.call('zadd', KEYS[3], unpack(ARGV, tonumber(ARGV[2]) + 2 + 1, #ARGV - 1));" + "for i=tonumber(ARGV[2]) + 2 + 1, #ARGV - 1, 5000 do "
+ "redis.call('hmset', KEYS[3], unpack(ARGV, i, math.min(i+4999, #ARGV - 1))); "
+ "end; "
+ "redis.call('publish', KEYS[2], ARGV[#ARGV]); " + "redis.call('publish', KEYS[2], ARGV[#ARGV]); "
+ "end;", + "end;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()), Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
@ -1050,6 +1050,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
@Override @Override
public void operationComplete(Future<Void> future) throws Exception { public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
result.tryFailure(future.cause());
return; return;
} }
@ -1125,7 +1126,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
result.add((V) value.getValue()); result.add((V) value.getValue());
} }
final RPromise<Collection<V>> promise = newPromise(); final RPromise<Collection<V>> promise = new RedissonPromise<Collection<V>>();
RFuture<Collection<V>> future = commandExecutor.evalReadAsync(getName(), codec, ALL_KEYS, RFuture<Collection<V>> future = commandExecutor.evalReadAsync(getName(), codec, ALL_KEYS,
"local entries = redis.call('hgetall', KEYS[1]); " "local entries = redis.call('hgetall', KEYS[1]); "
+ "local result = {};" + "local result = {};"
@ -1172,13 +1173,14 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
result.put((K)value.getKey(), (V)value.getValue()); result.put((K)value.getKey(), (V)value.getValue());
} }
final RPromise<Map<K, V>> promise = newPromise(); final RPromise<Map<K, V>> promise = new RedissonPromise<Map<K, V>>();
RFuture<Map<K, V>> future = readAll(ALL_MAP, mapKeys, result); RFuture<Map<K, V>> future = readAll(ALL_MAP, mapKeys, result);
future.addListener(new FutureListener<Map<K, V>>() { future.addListener(new FutureListener<Map<K, V>>() {
@Override @Override
public void operationComplete(Future<Map<K, V>> future) throws Exception { public void operationComplete(Future<Map<K, V>> future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return; return;
} }
@ -1215,13 +1217,14 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
result.add(new AbstractMap.SimpleEntry<K, V>((K)value.getKey(), (V)value.getValue())); result.add(new AbstractMap.SimpleEntry<K, V>((K)value.getKey(), (V)value.getValue()));
} }
final RPromise<Set<Entry<K, V>>> promise = newPromise(); final RPromise<Set<Entry<K, V>>> promise = new RedissonPromise<Set<Entry<K, V>>>();
RFuture<Set<Entry<K, V>>> future = readAll(ALL_ENTRIES, mapKeys, result); RFuture<Set<Entry<K, V>>> future = readAll(ALL_ENTRIES, mapKeys, result);
future.addListener(new FutureListener<Set<Entry<K, V>>>() { future.addListener(new FutureListener<Set<Entry<K, V>>>() {
@Override @Override
public void operationComplete(Future<Set<Entry<K, V>>> future) throws Exception { public void operationComplete(Future<Set<Entry<K, V>>> future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return; return;
} }

@ -49,8 +49,8 @@ public class RedissonLongAdder extends RedissonBaseAdder<Long> implements RLongA
} }
@Override @Override
protected RFuture<Long> getAsync() { protected RFuture<Long> getAndDeleteAsync() {
return atomicLong.getAsync(); return atomicLong.getAndDeleteAsync();
} }
@Override @Override

@ -87,6 +87,8 @@ public class RedissonReadLock extends RedissonLock implements RLock {
@Override @Override
protected RFuture<Boolean> unlockInnerAsync(long threadId) { protected RFuture<Boolean> unlockInnerAsync(long threadId) {
String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId); String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);
String keyPrefix = timeoutPrefix.split(":" + getLockName(threadId))[0];
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local mode = redis.call('hget', KEYS[1], 'mode'); " + "local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " + "if (mode == false) then " +
@ -129,7 +131,7 @@ public class RedissonReadLock extends RedissonLock implements RLock {
"redis.call('del', KEYS[1]); " + "redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; ", "return 1; ",
Arrays.<Object>asList(getName(), getChannelName(), timeoutPrefix, timeoutPrefix.split(":")[0]), Arrays.<Object>asList(getName(), getChannelName(), timeoutPrefix, keyPrefix),
LockPubSub.unlockMessage, getLockName(threadId)); LockPubSub.unlockMessage, getLockName(threadId));
} }

@ -63,6 +63,13 @@ public interface RAtomicDouble extends RExpirable, RAtomicDoubleAsync {
*/ */
double get(); double get();
/**
* Gets and deletes object
*
* @return the current value
*/
double getAndDelete();
/** /**
* Atomically adds the given value to the current value. * Atomically adds the given value to the current value.
* *

@ -15,6 +15,11 @@
*/ */
package org.redisson.api; package org.redisson.api;
/**
*
* @author Nikita Koksharov
*
*/
public interface RAtomicDoubleAsync extends RExpirableAsync { public interface RAtomicDoubleAsync extends RExpirableAsync {
RFuture<Boolean> compareAndSetAsync(double expect, double update); RFuture<Boolean> compareAndSetAsync(double expect, double update);
@ -25,6 +30,13 @@ public interface RAtomicDoubleAsync extends RExpirableAsync {
RFuture<Double> getAsync(); RFuture<Double> getAsync();
/**
* Gets and deletes object
*
* @return the current value
*/
RFuture<Double> getAndDeleteAsync();
RFuture<Double> getAndAddAsync(double delta); RFuture<Double> getAndAddAsync(double delta);
RFuture<Double> getAndSetAsync(double newValue); RFuture<Double> getAndSetAsync(double newValue);

@ -63,6 +63,13 @@ public interface RAtomicLong extends RExpirable, RAtomicLongAsync {
*/ */
long get(); long get();
/**
* Gets and deletes object
*
* @return the current value
*/
long getAndDelete();
/** /**
* Atomically adds the given value to the current value. * Atomically adds the given value to the current value.
* *

@ -15,6 +15,11 @@
*/ */
package org.redisson.api; package org.redisson.api;
/**
*
* @author Nikita Koksharov
*
*/
public interface RAtomicLongAsync extends RExpirableAsync { public interface RAtomicLongAsync extends RExpirableAsync {
RFuture<Boolean> compareAndSetAsync(long expect, long update); RFuture<Boolean> compareAndSetAsync(long expect, long update);
@ -25,6 +30,13 @@ public interface RAtomicLongAsync extends RExpirableAsync {
RFuture<Long> getAsync(); RFuture<Long> getAsync();
/**
* Gets and deletes object
*
* @return the current value
*/
RFuture<Long> getAndDeleteAsync();
RFuture<Long> getAndAddAsync(long delta); RFuture<Long> getAndAddAsync(long delta);
RFuture<Long> getAndSetAsync(long newValue); RFuture<Long> getAndSetAsync(long newValue);

@ -23,11 +23,9 @@ import org.redisson.client.codec.Codec;
/** /**
* Interface for using pipeline feature. * Interface for using pipeline feature.
* <p> * <p>
* All method invocations on objects * All method invocations on objects got through this interface
* from this interface are batched to separate queue and could be executed later * are batched to separate queue and could be executed later
* with <code>execute()</code> or <code>executeAsync()</code> methods. * with <code>execute()</code> or <code>executeAsync()</code> methods.
* <p>
* Please be aware, atomicity <b>is not</b> guaranteed.
* *
* *
* @author Nikita Koksharov * @author Nikita Koksharov
@ -410,9 +408,19 @@ public interface RBatch {
@Deprecated @Deprecated
RFuture<Void> executeSkipResultAsync(); RFuture<Void> executeSkipResultAsync();
/**
* Atomically executes all batched commands as a single command.
* <p>
* Please note, that in cluster mode all objects should be on the same cluster slot.
* https://github.com/antirez/redis/issues/3682
*
* @return
*/
RBatch atomic();
/** /**
* Inform Redis not to send reply for this batch. * Inform Redis not to send reply for this batch.
* Such approach saves response bandwidth. * Such approach saves network traffic.
* <p> * <p>
* NOTE: Redis 3.2+ required * NOTE: Redis 3.2+ required
* *

@ -35,6 +35,8 @@ public interface RBucket<V> extends RExpirable, RBucketAsync<V> {
V get(); V get();
V getAndDelete();
boolean trySet(V value); boolean trySet(V value);
boolean trySet(V value, long timeToLive, TimeUnit timeUnit); boolean trySet(V value, long timeToLive, TimeUnit timeUnit);

@ -35,6 +35,8 @@ public interface RBucketAsync<V> extends RExpirableAsync {
RFuture<V> getAsync(); RFuture<V> getAsync();
RFuture<V> getAndDeleteAsync();
RFuture<Boolean> trySetAsync(V value); RFuture<Boolean> trySetAsync(V value);
RFuture<Boolean> trySetAsync(V value, long timeToLive, TimeUnit timeUnit); RFuture<Boolean> trySetAsync(V value, long timeToLive, TimeUnit timeUnit);

@ -38,7 +38,7 @@ import org.redisson.api.mapreduce.RMapReduce;
public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable, RMapAsync<K, V> { public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable, RMapAsync<K, V> {
/** /**
* Loads all map entries to this Redis map. * Loads all map entries to this Redis map using {@link org.redisson.api.map.MapLoader}.
* *
* @param replaceExistingValues - <code>true</code> if existed values should be replaced, <code>false</code> otherwise. * @param replaceExistingValues - <code>true</code> if existed values should be replaced, <code>false</code> otherwise.
* @param parallelism - parallelism level, used to increase speed of process execution * @param parallelism - parallelism level, used to increase speed of process execution
@ -46,7 +46,7 @@ public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable, RMapAsync<K
void loadAll(boolean replaceExistingValues, int parallelism); void loadAll(boolean replaceExistingValues, int parallelism);
/** /**
* Loads map entries whose keys are listed in defined <code>keys</code> parameter. * Loads map entries using {@link org.redisson.api.map.MapLoader} whose keys are listed in defined <code>keys</code> parameter.
* *
* @param keys - map keys * @param keys - map keys
* @param replaceExistingValues - <code>true</code> if existed values should be replaced, <code>false</code> otherwise. * @param replaceExistingValues - <code>true</code> if existed values should be replaced, <code>false</code> otherwise.

@ -79,6 +79,12 @@ public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<
*/ */
Integer revRank(V o); Integer revRank(V o);
/**
* Returns score of element or <code>null</code> if it doesn't exist.
*
* @param o - element
* @return score
*/
Double getScore(V o); Double getScore(V o);
/** /**

@ -52,6 +52,12 @@ public interface RScoredSortedSetAsync<V> extends RExpirableAsync, RSortableAsyn
RFuture<Integer> revRankAsync(V o); RFuture<Integer> revRankAsync(V o);
/**
* Returns score of element or <code>null</code> if it doesn't exist.
*
* @param o - element
* @return score
*/
RFuture<Double> getScoreAsync(V o); RFuture<Double> getScoreAsync(V o);
/** /**

@ -17,6 +17,7 @@ package org.redisson.client.handler;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator;
import java.util.List; import java.util.List;
import org.redisson.client.RedisAskException; import org.redisson.client.RedisAskException;
@ -31,6 +32,8 @@ import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData; import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.QueueCommand; import org.redisson.client.protocol.QueueCommand;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.decoder.ListMultiDecoder; import org.redisson.client.protocol.decoder.ListMultiDecoder;
import org.redisson.client.protocol.decoder.MultiDecoder; import org.redisson.client.protocol.decoder.MultiDecoder;
@ -134,12 +137,11 @@ public class CommandDecoder extends ReplayingDecoder<State> {
decodeList(in, cmd, firstLevel.getParts(), ctx.channel(), secondLevel.getSize(), secondLevel.getParts()); decodeList(in, cmd, firstLevel.getParts(), ctx.channel(), secondLevel.getSize(), secondLevel.getParts());
Channel channel = ctx.channel();
MultiDecoder<Object> decoder = messageDecoder(cmd, firstLevel.getParts()); MultiDecoder<Object> decoder = messageDecoder(cmd, firstLevel.getParts());
if (decoder != null) { if (decoder != null) {
Object result = decoder.decode(firstLevel.getParts(), state()); Object result = decoder.decode(firstLevel.getParts(), state());
if (data != null) { if (data != null) {
handleResult(cmd, null, result, true, channel); handleResult(cmd, null, result, true, ctx.channel());
} }
} }
} }
@ -149,7 +151,18 @@ public class CommandDecoder extends ReplayingDecoder<State> {
state().resetLevel(); state().resetLevel();
decode(in, cmd, null, ctx.channel()); decode(in, cmd, null, ctx.channel());
} else { } else {
decodeList(in, cmd, null, ctx.channel(), firstLevel.getSize(), firstLevel.getParts()); if (firstLevel.getLastList() != null) {
decodeList(in, cmd, firstLevel.getParts(), ctx.channel(), firstLevel.getLastListSize(), firstLevel.getLastList());
firstLevel.setLastList(null);
firstLevel.setLastListSize(0);
if (in.isReadable()) {
decode(in, cmd, firstLevel.getParts(), ctx.channel());
}
decodeList(in, cmd, null, ctx.channel(), 0, firstLevel.getParts());
} else {
decodeList(in, cmd, null, ctx.channel(), firstLevel.getSize(), firstLevel.getParts());
}
} }
} }
} }
@ -160,22 +173,48 @@ public class CommandDecoder extends ReplayingDecoder<State> {
Throwable error = null; Throwable error = null;
while (in.writerIndex() > in.readerIndex()) { while (in.writerIndex() > in.readerIndex()) {
CommandData<Object, Object> cmd = null; CommandData<Object, Object> commandData = null;
try { try {
checkpoint(); checkpoint();
state().setBatchIndex(i); state().setBatchIndex(i);
cmd = (CommandData<Object, Object>) commandBatch.getCommands().get(i); RedisCommand<?> cmd = commandBatch.getCommands().get(i).getCommand();
decode(in, cmd, null, ctx.channel()); if (!commandBatch.isAtomic()
|| RedisCommands.EXEC.getName().equals(cmd.getName())
|| RedisCommands.WAIT.getName().equals(cmd.getName())) {
commandData = (CommandData<Object, Object>) commandBatch.getCommands().get(i);
}
decode(in, commandData, null, ctx.channel());
if (commandData != null && RedisCommands.EXEC.getName().equals(commandData.getCommand().getName())) {
List<Object> objects = (List<Object>) commandData.getPromise().getNow();
Iterator<Object> iter = objects.iterator();
boolean multiFound = false;
for (CommandData<?, ?> command : commandBatch.getCommands()) {
if (multiFound) {
if (!iter.hasNext()) {
break;
}
Object res = iter.next();
handleResult((CommandData<Object, Object>) command, null, res, false, ctx.channel());
}
if (RedisCommands.MULTI.getName().equals(command.getCommand().getName())) {
multiFound = true;
}
}
}
} catch (Exception e) { } catch (Exception e) {
cmd.tryFailure(e); commandData.tryFailure(e);
} }
i++; i++;
if (!cmd.isSuccess()) { if (commandData != null && !commandData.isSuccess()) {
error = cmd.cause(); error = commandData.cause();
} }
} }
if (commandBatch.isNoResult() || i == commandBatch.getCommands().size()) { if (commandBatch.isSkipResult() || i == commandBatch.getCommands().size()) {
RPromise<Void> promise = commandBatch.getPromise(); RPromise<Void> promise = commandBatch.getPromise();
if (error != null) { if (error != null) {
if (!promise.tryFailure(error) && promise.cause() instanceof RedisTimeoutException) { if (!promise.tryFailure(error) && promise.cause() instanceof RedisTimeoutException) {
@ -258,22 +297,34 @@ public class CommandDecoder extends ReplayingDecoder<State> {
} }
handleResult(data, parts, result, false, channel); handleResult(data, parts, result, false, channel);
} else if (code == '*') { } else if (code == '*') {
int level = state().incLevel();
long size = readLong(in); long size = readLong(in);
List<Object> respParts; List<Object> respParts;
if (state().getLevels().size()-1 >= level) {
StateLevel stateLevel = state().getLevels().get(level); StateLevel lastLevel = state().getLastLevel();
respParts = stateLevel.getParts(); if (lastLevel != null && lastLevel.getSize() != lastLevel.getParts().size()) {
size = stateLevel.getSize();
} else {
respParts = new ArrayList<Object>(); respParts = new ArrayList<Object>();
if (state().isMakeCheckpoint()) { lastLevel.setLastListSize(size);
state().addLevel(new StateLevel(size, respParts)); lastLevel.setLastList(respParts);
} else {
int level = state().incLevel();
if (state().getLevels().size()-1 >= level) {
StateLevel stateLevel = state().getLevels().get(level);
respParts = stateLevel.getParts();
size = stateLevel.getSize();
} else {
respParts = new ArrayList<Object>();
if (state().isMakeCheckpoint()) {
state().addLevel(new StateLevel(size, respParts));
}
} }
} }
decodeList(in, data, parts, channel, size, respParts); decodeList(in, data, parts, channel, size, respParts);
if (lastLevel != null && lastLevel.getLastList() != null) {
lastLevel.setLastList(null);
lastLevel.setLastListSize(0);
}
} else { } else {
String dataStr = in.toString(0, in.writerIndex(), CharsetUtil.UTF_8); String dataStr = in.toString(0, in.writerIndex(), CharsetUtil.UTF_8);
throw new IllegalStateException("Can't decode replay: " + dataStr); throw new IllegalStateException("Can't decode replay: " + dataStr);

@ -96,7 +96,7 @@ public class CommandEncoder extends MessageToByteEncoder<CommandData<?, ?>> {
log.trace("channel: {} message: {}", ctx.channel(), out.toString(CharsetUtil.UTF_8)); log.trace("channel: {} message: {}", ctx.channel(), out.toString(CharsetUtil.UTF_8));
} }
} catch (Exception e) { } catch (Exception e) {
msg.getPromise().tryFailure(e); msg.tryFailure(e);
throw e; throw e;
} }
} }

@ -28,7 +28,6 @@ public class State {
private int level = -1; private int level = -1;
private List<StateLevel> levels; private List<StateLevel> levels;
private DecoderState decoderStateCopy;
private final boolean makeCheckpoint; private final boolean makeCheckpoint;
public State(boolean makeCheckpoint) { public State(boolean makeCheckpoint) {
@ -41,6 +40,7 @@ public class State {
public void resetLevel() { public void resetLevel() {
level = -1; level = -1;
levels.clear();
} }
public int decLevel() { public int decLevel() {
return --level; return --level;
@ -49,6 +49,13 @@ public class State {
return ++level; return ++level;
} }
public StateLevel getLastLevel() {
if (levels == null || levels.isEmpty()) {
return null;
}
return levels.get(level);
}
public void addLevel(StateLevel stateLevel) { public void addLevel(StateLevel stateLevel) {
if (levels == null) { if (levels == null) {
levels = new ArrayList<StateLevel>(2); levels = new ArrayList<StateLevel>(2);
@ -76,17 +83,10 @@ public class State {
this.decoderState = decoderState; this.decoderState = decoderState;
} }
public DecoderState getDecoderStateCopy() {
return decoderStateCopy;
}
public void setDecoderStateCopy(DecoderState decoderStateCopy) {
this.decoderStateCopy = decoderStateCopy;
}
@Override @Override
public String toString() { public String toString() {
return "State [batchIndex=" + batchIndex + ", decoderState=" + decoderState + ", level=" + level + ", levels=" return "State [batchIndex=" + batchIndex + ", decoderState=" + decoderState + ", level=" + level + ", levels="
+ levels + ", decoderStateCopy=" + decoderStateCopy + "]"; + levels + "]";
} }

@ -21,6 +21,8 @@ public class StateLevel {
private long size; private long size;
private List<Object> parts; private List<Object> parts;
private long lastListSize;
private List<Object> lastList;
public StateLevel(long size, List<Object> parts) { public StateLevel(long size, List<Object> parts) {
super(); super();
@ -28,6 +30,20 @@ public class StateLevel {
this.parts = parts; this.parts = parts;
} }
public long getLastListSize() {
return lastListSize;
}
public void setLastListSize(long lastListSize) {
this.lastListSize = lastListSize;
}
public List<Object> getLastList() {
return lastList;
}
public void setLastList(List<Object> lastList) {
this.lastList = lastList;
}
public long getSize() { public long getSize() {
return size; return size;
} }

@ -29,25 +29,31 @@ public class CommandsData implements QueueCommand {
private final List<CommandData<?, ?>> commands; private final List<CommandData<?, ?>> commands;
private final RPromise<Void> promise; private final RPromise<Void> promise;
private final boolean noResult; private final boolean skipResult;
private final boolean atomic;
public CommandsData(RPromise<Void> promise, List<CommandData<?, ?>> commands) { public CommandsData(RPromise<Void> promise, List<CommandData<?, ?>> commands) {
this(promise, commands, false); this(promise, commands, false, false);
} }
public CommandsData(RPromise<Void> promise, List<CommandData<?, ?>> commands, boolean noResult) { public CommandsData(RPromise<Void> promise, List<CommandData<?, ?>> commands, boolean skipResult, boolean atomic) {
super(); super();
this.promise = promise; this.promise = promise;
this.commands = commands; this.commands = commands;
this.noResult = noResult; this.skipResult = skipResult;
this.atomic = atomic;
} }
public RPromise<Void> getPromise() { public RPromise<Void> getPromise() {
return promise; return promise;
} }
public boolean isNoResult() { public boolean isAtomic() {
return noResult; return atomic;
}
public boolean isSkipResult() {
return skipResult;
} }
public List<CommandData<?, ?>> getCommands() { public List<CommandData<?, ?>> getCommands() {

@ -32,6 +32,7 @@ import org.redisson.client.protocol.convertor.BooleanNullReplayConvertor;
import org.redisson.client.protocol.convertor.BooleanNullSafeReplayConvertor; import org.redisson.client.protocol.convertor.BooleanNullSafeReplayConvertor;
import org.redisson.client.protocol.convertor.BooleanNumberReplayConvertor; import org.redisson.client.protocol.convertor.BooleanNumberReplayConvertor;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor; import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.convertor.DoubleNullSafeReplayConvertor;
import org.redisson.client.protocol.convertor.DoubleReplayConvertor; import org.redisson.client.protocol.convertor.DoubleReplayConvertor;
import org.redisson.client.protocol.convertor.IntegerReplayConvertor; import org.redisson.client.protocol.convertor.IntegerReplayConvertor;
import org.redisson.client.protocol.convertor.KeyValueConvertor; import org.redisson.client.protocol.convertor.KeyValueConvertor;
@ -133,7 +134,7 @@ public interface RedisCommands {
RedisCommand<List<ScoredEntry<Object>>> ZRANGE_ENTRY = new RedisCommand<List<ScoredEntry<Object>>>("ZRANGE", new ScoredSortedSetReplayDecoder<Object>()); RedisCommand<List<ScoredEntry<Object>>> ZRANGE_ENTRY = new RedisCommand<List<ScoredEntry<Object>>>("ZRANGE", new ScoredSortedSetReplayDecoder<Object>());
RedisCommand<List<ScoredEntry<Object>>> ZRANGEBYSCORE_ENTRY = new RedisCommand<List<ScoredEntry<Object>>>("ZRANGEBYSCORE", new ScoredSortedSetReplayDecoder<Object>()); RedisCommand<List<ScoredEntry<Object>>> ZRANGEBYSCORE_ENTRY = new RedisCommand<List<ScoredEntry<Object>>>("ZRANGEBYSCORE", new ScoredSortedSetReplayDecoder<Object>());
RedisCommand<ListScanResult<Object>> ZSCAN = new RedisCommand<ListScanResult<Object>>("ZSCAN", new ListMultiDecoder(new LongMultiDecoder(), new ScoredSortedSetScanDecoder<Object>(), new ScoredSortedSetScanReplayDecoder())); RedisCommand<ListScanResult<Object>> ZSCAN = new RedisCommand<ListScanResult<Object>>("ZSCAN", new ListMultiDecoder(new LongMultiDecoder(), new ScoredSortedSetScanDecoder<Object>(), new ScoredSortedSetScanReplayDecoder()));
RedisStrictCommand<Double> ZINCRBY = new RedisStrictCommand<Double>("ZINCRBY", new DoubleReplayConvertor()); RedisStrictCommand<Double> ZINCRBY = new RedisStrictCommand<Double>("ZINCRBY", new DoubleNullSafeReplayConvertor());
RedisCommand<ListScanResult<String>> SCAN = new RedisCommand<ListScanResult<String>>("SCAN", new ListMultiDecoder(new LongMultiDecoder(), new ObjectListReplayDecoder<String>(), new ListScanResultReplayDecoder())); RedisCommand<ListScanResult<String>> SCAN = new RedisCommand<ListScanResult<String>>("SCAN", new ListMultiDecoder(new LongMultiDecoder(), new ObjectListReplayDecoder<String>(), new ListScanResultReplayDecoder()));
RedisStrictCommand<String> RANDOM_KEY = new RedisStrictCommand<String>("RANDOMKEY", new StringDataDecoder()); RedisStrictCommand<String> RANDOM_KEY = new RedisStrictCommand<String>("RANDOMKEY", new StringDataDecoder());
@ -221,7 +222,9 @@ public interface RedisCommands {
RedisStrictCommand<String> EVAL_STRING = new RedisStrictCommand<String>("EVAL", new StringReplayDecoder()); RedisStrictCommand<String> EVAL_STRING = new RedisStrictCommand<String>("EVAL", new StringReplayDecoder());
RedisStrictCommand<String> EVAL_STRING_DATA = new RedisStrictCommand<String>("EVAL", new StringDataDecoder()); RedisStrictCommand<String> EVAL_STRING_DATA = new RedisStrictCommand<String>("EVAL", new StringDataDecoder());
RedisStrictCommand<Integer> EVAL_INTEGER = new RedisStrictCommand<Integer>("EVAL", new IntegerReplayConvertor()); RedisStrictCommand<Integer> EVAL_INTEGER = new RedisStrictCommand<Integer>("EVAL", new IntegerReplayConvertor());
RedisStrictCommand<Double> EVAL_DOUBLE = new RedisStrictCommand<Double>("EVAL", new DoubleNullSafeReplayConvertor());
RedisStrictCommand<Long> EVAL_LONG = new RedisStrictCommand<Long>("EVAL"); RedisStrictCommand<Long> EVAL_LONG = new RedisStrictCommand<Long>("EVAL");
RedisStrictCommand<Long> EVAL_LONG_SAFE = new RedisStrictCommand<Long>("EVAL", new LongReplayConvertor());
RedisStrictCommand<Void> EVAL_VOID = new RedisStrictCommand<Void>("EVAL", new VoidReplayConvertor()); RedisStrictCommand<Void> EVAL_VOID = new RedisStrictCommand<Void>("EVAL", new VoidReplayConvertor());
RedisCommand<List<Object>> EVAL_LIST = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>()); RedisCommand<List<Object>> EVAL_LIST = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>());
RedisCommand<Set<Object>> EVAL_SET = new RedisCommand<Set<Object>>("EVAL", new ObjectSetReplayDecoder<Object>()); RedisCommand<Set<Object>> EVAL_SET = new RedisCommand<Set<Object>>("EVAL", new ObjectSetReplayDecoder<Object>());
@ -235,7 +238,7 @@ public interface RedisCommands {
RedisStrictCommand<Long> INCR = new RedisStrictCommand<Long>("INCR"); RedisStrictCommand<Long> INCR = new RedisStrictCommand<Long>("INCR");
RedisStrictCommand<Long> INCRBY = new RedisStrictCommand<Long>("INCRBY"); RedisStrictCommand<Long> INCRBY = new RedisStrictCommand<Long>("INCRBY");
RedisStrictCommand<Double> INCRBYFLOAT = new RedisStrictCommand<Double>("INCRBYFLOAT", new DoubleReplayConvertor()); RedisStrictCommand<Double> INCRBYFLOAT = new RedisStrictCommand<Double>("INCRBYFLOAT", new DoubleNullSafeReplayConvertor());
RedisStrictCommand<Long> DECR = new RedisStrictCommand<Long>("DECR"); RedisStrictCommand<Long> DECR = new RedisStrictCommand<Long>("DECR");
RedisStrictCommand<Void> AUTH = new RedisStrictCommand<Void>("AUTH", new VoidReplayConvertor()); RedisStrictCommand<Void> AUTH = new RedisStrictCommand<Void>("AUTH", new VoidReplayConvertor());
@ -281,6 +284,7 @@ public interface RedisCommands {
RedisCommand<Object> GET = new RedisCommand<Object>("GET"); RedisCommand<Object> GET = new RedisCommand<Object>("GET");
RedisStrictCommand<Long> GET_LONG = new RedisStrictCommand<Long>("GET", new LongReplayConvertor()); RedisStrictCommand<Long> GET_LONG = new RedisStrictCommand<Long>("GET", new LongReplayConvertor());
RedisStrictCommand<Integer> GET_INTEGER = new RedisStrictCommand<Integer>("GET", new IntegerReplayConvertor()); RedisStrictCommand<Integer> GET_INTEGER = new RedisStrictCommand<Integer>("GET", new IntegerReplayConvertor());
RedisStrictCommand<Double> GET_DOUBLE = new RedisStrictCommand<Double>("GET", new DoubleNullSafeReplayConvertor());
RedisCommand<Object> GETSET = new RedisCommand<Object>("GETSET"); RedisCommand<Object> GETSET = new RedisCommand<Object>("GETSET");
RedisCommand<Object> GETRANGE = new RedisCommand<Object>("GETRANGE"); RedisCommand<Object> GETRANGE = new RedisCommand<Object>("GETRANGE");
RedisCommand<Object> APPEND = new RedisCommand<Object>("APPEND"); RedisCommand<Object> APPEND = new RedisCommand<Object>("APPEND");
@ -290,9 +294,9 @@ public interface RedisCommands {
RedisCommand<Boolean> SETNX = new RedisCommand<Boolean>("SETNX", new BooleanReplayConvertor()); RedisCommand<Boolean> SETNX = new RedisCommand<Boolean>("SETNX", new BooleanReplayConvertor());
RedisCommand<Void> PSETEX = new RedisCommand<Void>("PSETEX", new VoidReplayConvertor()); RedisCommand<Void> PSETEX = new RedisCommand<Void>("PSETEX", new VoidReplayConvertor());
RedisStrictCommand<Long> TOUCH_LONG = new RedisStrictCommand<Long>("TOUCH"); RedisStrictCommand<Long> TOUCH_LONG = new RedisStrictCommand<Long>("TOUCH", new LongReplayConvertor());
RedisStrictCommand<Boolean> TOUCH = new RedisStrictCommand<Boolean>("TOUCH", new BooleanReplayConvertor()); RedisStrictCommand<Boolean> TOUCH = new RedisStrictCommand<Boolean>("TOUCH", new BooleanReplayConvertor());
RedisStrictCommand<Long> EXISTS_LONG = new RedisStrictCommand<Long>("EXISTS"); RedisStrictCommand<Long> EXISTS_LONG = new RedisStrictCommand<Long>("EXISTS", new LongReplayConvertor());
RedisStrictCommand<Boolean> EXISTS = new RedisStrictCommand<Boolean>("EXISTS", new BooleanReplayConvertor()); RedisStrictCommand<Boolean> EXISTS = new RedisStrictCommand<Boolean>("EXISTS", new BooleanReplayConvertor());
RedisStrictCommand<Boolean> NOT_EXISTS = new RedisStrictCommand<Boolean>("EXISTS", new BooleanNumberReplayConvertor(1L)); RedisStrictCommand<Boolean> NOT_EXISTS = new RedisStrictCommand<Boolean>("EXISTS", new BooleanNumberReplayConvertor(1L));

@ -0,0 +1,34 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.convertor;
/**
*
* @author Nikita Koksharov
*
*/
public class DoubleNullSafeReplayConvertor extends DoubleReplayConvertor {
@Override
public Double convert(Object obj) {
Double r = super.convert(obj);
if (r == null) {
return 0.0;
}
return r;
}
}

@ -42,6 +42,7 @@ import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException; import org.redisson.client.RedisException;
import org.redisson.client.RedisLoadingException; import org.redisson.client.RedisLoadingException;
import org.redisson.client.RedisMovedException; import org.redisson.client.RedisMovedException;
import org.redisson.client.RedisRedirectException;
import org.redisson.client.RedisTimeoutException; import org.redisson.client.RedisTimeoutException;
import org.redisson.client.RedisTryAgainException; import org.redisson.client.RedisTryAgainException;
import org.redisson.client.WriteRedisConnectionException; import org.redisson.client.WriteRedisConnectionException;
@ -62,6 +63,7 @@ import org.redisson.connection.NodeSource.Redirect;
import org.redisson.misc.LogHelper; import org.redisson.misc.LogHelper;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonObjectFactory; import org.redisson.misc.RedissonObjectFactory;
import org.redisson.misc.RedissonPromise;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -181,36 +183,36 @@ public class CommandAsyncService implements CommandAsyncExecutor {
@Override @Override
public <T, R> RFuture<R> readAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) { public <T, R> RFuture<R> readAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = connectionManager.newPromise(); RPromise<R> mainPromise = new RedissonPromise<R>();
async(true, new NodeSource(entry, client), codec, command, params, mainPromise, 0); async(true, new NodeSource(entry, client), codec, command, params, mainPromise, 0, false);
return mainPromise; return mainPromise;
} }
@Override @Override
public <T, R> RFuture<R> readAsync(RedisClient client, String name, Codec codec, RedisCommand<T> command, Object... params) { public <T, R> RFuture<R> readAsync(RedisClient client, String name, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = connectionManager.newPromise(); RPromise<R> mainPromise = new RedissonPromise<R>();
int slot = connectionManager.calcSlot(name); int slot = connectionManager.calcSlot(name);
async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0); async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0, false);
return mainPromise; return mainPromise;
} }
@Override @Override
public <T, R> RFuture<R> readAsync(RedisClient client, Codec codec, RedisCommand<T> command, Object... params) { public <T, R> RFuture<R> readAsync(RedisClient client, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = connectionManager.newPromise(); RPromise<R> mainPromise = new RedissonPromise<R>();
async(true, new NodeSource(client), codec, command, params, mainPromise, 0); async(true, new NodeSource(client), codec, command, params, mainPromise, 0, false);
return mainPromise; return mainPromise;
} }
@Override @Override
public <T, R> RFuture<Collection<R>> readAllAsync(RedisCommand<T> command, Object... params) { public <T, R> RFuture<Collection<R>> readAllAsync(RedisCommand<T> command, Object... params) {
final RPromise<Collection<R>> mainPromise = connectionManager.newPromise(); final RPromise<Collection<R>> mainPromise = new RedissonPromise<Collection<R>>();
final Collection<MasterSlaveEntry> nodes = connectionManager.getEntrySet(); final Collection<MasterSlaveEntry> nodes = connectionManager.getEntrySet();
final List<R> results = new ArrayList<R>(); final List<R> results = new ArrayList<R>();
final AtomicInteger counter = new AtomicInteger(nodes.size()); final AtomicInteger counter = new AtomicInteger(nodes.size());
FutureListener<R> listener = new FutureListener<R>() { FutureListener<R> listener = new FutureListener<R>() {
@Override @Override
public void operationComplete(Future<R> future) throws Exception { public void operationComplete(Future<R> future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess() && !(future.cause() instanceof RedisRedirectException)) {
mainPromise.tryFailure(future.cause()); mainPromise.tryFailure(future.cause());
return; return;
} }
@ -234,16 +236,16 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}; };
for (MasterSlaveEntry entry : nodes) { for (MasterSlaveEntry entry : nodes) {
RPromise<R> promise = connectionManager.newPromise(); RPromise<R> promise = new RedissonPromise<R>();
promise.addListener(listener); promise.addListener(listener);
async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0); async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0, true);
} }
return mainPromise; return mainPromise;
} }
@Override @Override
public <T, R> RFuture<R> readRandomAsync(RedisCommand<T> command, Object... params) { public <T, R> RFuture<R> readRandomAsync(RedisCommand<T> command, Object... params) {
final RPromise<R> mainPromise = connectionManager.newPromise(); final RPromise<R> mainPromise = new RedissonPromise<R>();
final List<MasterSlaveEntry> nodes = new ArrayList<MasterSlaveEntry>(connectionManager.getEntrySet()); final List<MasterSlaveEntry> nodes = new ArrayList<MasterSlaveEntry>(connectionManager.getEntrySet());
Collections.shuffle(nodes); Collections.shuffle(nodes);
@ -253,7 +255,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
private <R, T> void retryReadRandomAsync(final RedisCommand<T> command, final RPromise<R> mainPromise, private <R, T> void retryReadRandomAsync(final RedisCommand<T> command, final RPromise<R> mainPromise,
final List<MasterSlaveEntry> nodes, final Object... params) { final List<MasterSlaveEntry> nodes, final Object... params) {
final RPromise<R> attemptPromise = connectionManager.newPromise(); final RPromise<R> attemptPromise = new RedissonPromise<R>();
attemptPromise.addListener(new FutureListener<R>() { attemptPromise.addListener(new FutureListener<R>() {
@Override @Override
public void operationComplete(Future<R> future) throws Exception { public void operationComplete(Future<R> future) throws Exception {
@ -274,7 +276,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}); });
MasterSlaveEntry entry = nodes.remove(0); MasterSlaveEntry entry = nodes.remove(0);
async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, attemptPromise, 0); async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, attemptPromise, 0, false);
} }
@Override @Override
@ -293,19 +295,24 @@ public class CommandAsyncService implements CommandAsyncExecutor {
} }
private <T, R> RFuture<R> allAsync(boolean readOnlyMode, RedisCommand<T> command, final SlotCallback<T, R> callback, Object... params) { private <T, R> RFuture<R> allAsync(boolean readOnlyMode, RedisCommand<T> command, final SlotCallback<T, R> callback, Object... params) {
final RPromise<R> mainPromise = connectionManager.newPromise(); final RPromise<R> mainPromise = new RedissonPromise<R>();
final Collection<MasterSlaveEntry> nodes = connectionManager.getEntrySet(); final Collection<MasterSlaveEntry> nodes = connectionManager.getEntrySet();
final AtomicInteger counter = new AtomicInteger(nodes.size()); final AtomicInteger counter = new AtomicInteger(nodes.size());
FutureListener<T> listener = new FutureListener<T>() { FutureListener<T> listener = new FutureListener<T>() {
@Override @Override
public void operationComplete(Future<T> future) throws Exception { public void operationComplete(Future<T> future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess() && !(future.cause() instanceof RedisRedirectException)) {
mainPromise.tryFailure(future.cause()); mainPromise.tryFailure(future.cause());
return; return;
} }
T result = future.getNow();
if (future.cause() instanceof RedisRedirectException) {
result = command.getConvertor().convert(result);
}
if (callback != null) { if (callback != null) {
callback.onSlotResult(future.getNow()); callback.onSlotResult(result);
} }
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {
if (callback != null) { if (callback != null) {
@ -318,9 +325,9 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}; };
for (MasterSlaveEntry entry : nodes) { for (MasterSlaveEntry entry : nodes) {
RPromise<T> promise = connectionManager.newPromise(); RPromise<T> promise = new RedissonPromise<T>();
promise.addListener(listener); promise.addListener(listener);
async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0); async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0, true);
} }
return mainPromise; return mainPromise;
} }
@ -339,22 +346,22 @@ public class CommandAsyncService implements CommandAsyncExecutor {
@Override @Override
public <T, R> RFuture<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object... params) { public <T, R> RFuture<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = connectionManager.newPromise(); RPromise<R> mainPromise = new RedissonPromise<R>();
NodeSource source = getNodeSource(key); NodeSource source = getNodeSource(key);
async(true, source, codec, command, params, mainPromise, 0); async(true, source, codec, command, params, mainPromise, 0, false);
return mainPromise; return mainPromise;
} }
public <T, R> RFuture<R> readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) { public <T, R> RFuture<R> readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = connectionManager.newPromise(); RPromise<R> mainPromise = new RedissonPromise<R>();
async(true, new NodeSource(entry), codec, command, params, mainPromise, 0); async(true, new NodeSource(entry), codec, command, params, mainPromise, 0, false);
return mainPromise; return mainPromise;
} }
@Override @Override
public <T, R> RFuture<R> writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) { public <T, R> RFuture<R> writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = connectionManager.newPromise(); RPromise<R> mainPromise = new RedissonPromise<R>();
async(false, new NodeSource(entry), codec, command, params, mainPromise, 0); async(false, new NodeSource(entry), codec, command, params, mainPromise, 0, false);
return mainPromise; return mainPromise;
} }
@ -396,14 +403,14 @@ public class CommandAsyncService implements CommandAsyncExecutor {
} }
public <T, R> RFuture<R> evalAllAsync(boolean readOnlyMode, RedisCommand<T> command, final SlotCallback<T, R> callback, String script, List<Object> keys, Object... params) { public <T, R> RFuture<R> evalAllAsync(boolean readOnlyMode, RedisCommand<T> command, final SlotCallback<T, R> callback, String script, List<Object> keys, Object... params) {
final RPromise<R> mainPromise = connectionManager.newPromise(); final RPromise<R> mainPromise = new RedissonPromise<R>();
final Collection<MasterSlaveEntry> entries = connectionManager.getEntrySet(); final Collection<MasterSlaveEntry> entries = connectionManager.getEntrySet();
final AtomicInteger counter = new AtomicInteger(entries.size()); final AtomicInteger counter = new AtomicInteger(entries.size());
FutureListener<T> listener = new FutureListener<T>() { FutureListener<T> listener = new FutureListener<T>() {
@Override @Override
public void operationComplete(Future<T> future) throws Exception { public void operationComplete(Future<T> future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess() && !(future.cause() instanceof RedisRedirectException)) {
mainPromise.tryFailure(future.cause()); mainPromise.tryFailure(future.cause());
return; return;
} }
@ -422,21 +429,21 @@ public class CommandAsyncService implements CommandAsyncExecutor {
args.addAll(keys); args.addAll(keys);
args.addAll(Arrays.asList(params)); args.addAll(Arrays.asList(params));
for (MasterSlaveEntry entry : entries) { for (MasterSlaveEntry entry : entries) {
RPromise<T> promise = connectionManager.newPromise(); RPromise<T> promise = new RedissonPromise<T>();
promise.addListener(listener); promise.addListener(listener);
async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, args.toArray(), promise, 0); async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, args.toArray(), promise, 0, true);
} }
return mainPromise; return mainPromise;
} }
private <T, R> RFuture<R> evalAsync(NodeSource nodeSource, boolean readOnlyMode, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) { private <T, R> RFuture<R> evalAsync(NodeSource nodeSource, boolean readOnlyMode, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
RPromise<R> mainPromise = connectionManager.newPromise(); RPromise<R> mainPromise = new RedissonPromise<R>();
List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length); List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
args.add(script); args.add(script);
args.add(keys.size()); args.add(keys.size());
args.addAll(keys); args.addAll(keys);
args.addAll(Arrays.asList(params)); args.addAll(Arrays.asList(params));
async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), mainPromise, 0); async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), mainPromise, 0, false);
return mainPromise; return mainPromise;
} }
@ -447,14 +454,15 @@ public class CommandAsyncService implements CommandAsyncExecutor {
@Override @Override
public <T, R> RFuture<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object... params) { public <T, R> RFuture<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = connectionManager.newPromise(); RPromise<R> mainPromise = new RedissonPromise<R>();
NodeSource source = getNodeSource(key); NodeSource source = getNodeSource(key);
async(false, source, codec, command, params, mainPromise, 0); async(false, source, codec, command, params, mainPromise, 0, false);
return mainPromise; return mainPromise;
} }
protected <V, R> void async(final boolean readOnlyMode, final NodeSource source, final Codec codec, protected <V, R> void async(final boolean readOnlyMode, final NodeSource source, final Codec codec,
final RedisCommand<V> command, final Object[] params, final RPromise<R> mainPromise, final int attempt) { final RedisCommand<V> command, final Object[] params, final RPromise<R> mainPromise, final int attempt,
final boolean ignoreRedirect) {
if (mainPromise.isCancelled()) { if (mainPromise.isCancelled()) {
free(params); free(params);
return; return;
@ -490,7 +498,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
connectionFuture = connectionManager.connectionWriteOp(source, command); connectionFuture = connectionManager.connectionWriteOp(source, command);
} }
final RPromise<R> attemptPromise = connectionManager.newPromise(); final RPromise<R> attemptPromise = new RedissonPromise<R>();
details.init(connectionFuture, attemptPromise, details.init(connectionFuture, attemptPromise,
readOnlyMode, source, codec, command, params, mainPromise, attempt); readOnlyMode, source, codec, command, params, mainPromise, attempt);
@ -566,7 +574,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
count, details.getCommand(), Arrays.toString(details.getParams())); count, details.getCommand(), Arrays.toString(details.getParams()));
} }
details.removeMainPromiseListener(); details.removeMainPromiseListener();
async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count); async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count, ignoreRedirect);
AsyncDetails.release(details); AsyncDetails.release(details);
} }
@ -597,10 +605,10 @@ public class CommandAsyncService implements CommandAsyncExecutor {
final RedisConnection connection = connFuture.getNow(); final RedisConnection connection = connFuture.getNow();
if (details.getSource().getRedirect() == Redirect.ASK) { if (details.getSource().getRedirect() == Redirect.ASK) {
List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(2); List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(2);
RPromise<Void> promise = connectionManager.newPromise(); RPromise<Void> promise = new RedissonPromise<Void>();
list.add(new CommandData<Void, Void>(promise, details.getCodec(), RedisCommands.ASKING, new Object[]{})); list.add(new CommandData<Void, Void>(promise, details.getCodec(), RedisCommands.ASKING, new Object[]{}));
list.add(new CommandData<V, R>(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams())); list.add(new CommandData<V, R>(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams()));
RPromise<Void> main = connectionManager.newPromise(); RPromise<Void> main = new RedissonPromise<Void>();
ChannelFuture future = connection.send(new CommandsData(main, list)); ChannelFuture future = connection.send(new CommandsData(main, list));
details.setWriteFuture(future); details.setWriteFuture(future);
} else { } else {
@ -626,7 +634,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
attemptPromise.addListener(new FutureListener<R>() { attemptPromise.addListener(new FutureListener<R>() {
@Override @Override
public void operationComplete(Future<R> future) throws Exception { public void operationComplete(Future<R> future) throws Exception {
checkAttemptFuture(source, details, future); checkAttemptFuture(source, details, future, ignoreRedirect);
} }
}); });
} }
@ -780,7 +788,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
} }
private <R, V> void checkAttemptFuture(final NodeSource source, final AsyncDetails<V, R> details, private <R, V> void checkAttemptFuture(final NodeSource source, final AsyncDetails<V, R> details,
Future<R> future) { Future<R> future, boolean ignoreRedirect) {
details.getTimeout().cancel(); details.getTimeout().cancel();
if (future.isCancelled()) { if (future.isCancelled()) {
return; return;
@ -788,25 +796,25 @@ public class CommandAsyncService implements CommandAsyncExecutor {
details.removeMainPromiseListener(); details.removeMainPromiseListener();
if (future.cause() instanceof RedisMovedException) { if (future.cause() instanceof RedisMovedException && !ignoreRedirect) {
RedisMovedException ex = (RedisMovedException) future.cause(); RedisMovedException ex = (RedisMovedException) future.cause();
async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.MOVED), details.getCodec(), async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.MOVED), details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt()); details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect);
AsyncDetails.release(details); AsyncDetails.release(details);
return; return;
} }
if (future.cause() instanceof RedisAskException) { if (future.cause() instanceof RedisAskException && !ignoreRedirect) {
RedisAskException ex = (RedisAskException) future.cause(); RedisAskException ex = (RedisAskException) future.cause();
async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.ASK), details.getCodec(), async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.ASK), details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt()); details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect);
AsyncDetails.release(details); AsyncDetails.release(details);
return; return;
} }
if (future.cause() instanceof RedisLoadingException) { if (future.cause() instanceof RedisLoadingException) {
async(details.isReadOnlyMode(), source, details.getCodec(), async(details.isReadOnlyMode(), source, details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt()); details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect);
AsyncDetails.release(details); AsyncDetails.release(details);
return; return;
} }
@ -816,7 +824,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
@Override @Override
public void run(Timeout timeout) throws Exception { public void run(Timeout timeout) throws Exception {
async(details.isReadOnlyMode(), source, details.getCodec(), async(details.isReadOnlyMode(), source, details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt()); details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect);
} }
}, 1, TimeUnit.SECONDS); }, 1, TimeUnit.SECONDS);

@ -111,7 +111,7 @@ public class CommandBatchService extends CommandAsyncService {
@Override @Override
protected <V, R> void async(boolean readOnlyMode, NodeSource nodeSource, protected <V, R> void async(boolean readOnlyMode, NodeSource nodeSource,
Codec codec, RedisCommand<V> command, Object[] params, RPromise<R> mainPromise, int attempt) { Codec codec, RedisCommand<V> command, Object[] params, RPromise<R> mainPromise, int attempt, boolean ignoreRedirect) {
if (executed) { if (executed) {
throw new IllegalStateException("Batch already has been executed!"); throw new IllegalStateException("Batch already has been executed!");
} }
@ -140,18 +140,18 @@ public class CommandBatchService extends CommandAsyncService {
} }
public BatchResult<?> execute() { public BatchResult<?> execute() {
RFuture<BatchResult<?>> f = executeAsync(0, 0, false, 0, 0, 0); RFuture<BatchResult<?>> f = executeAsync(0, 0, false, 0, 0, 0, false);
return get(f); return get(f);
} }
public BatchResult<?> execute(int syncSlaves, long syncTimeout, boolean noResult, long responseTimeout, int retryAttempts, long retryInterval) { public BatchResult<?> execute(int syncSlaves, long syncTimeout, boolean noResult, long responseTimeout, int retryAttempts, long retryInterval, boolean atomic) {
RFuture<BatchResult<?>> f = executeAsync(syncSlaves, syncTimeout, noResult, responseTimeout, retryAttempts, retryInterval); RFuture<BatchResult<?>> f = executeAsync(syncSlaves, syncTimeout, noResult, responseTimeout, retryAttempts, retryInterval, atomic);
return get(f); return get(f);
} }
public RFuture<Void> executeAsyncVoid() { public RFuture<Void> executeAsyncVoid() {
final RedissonPromise<Void> promise = new RedissonPromise<Void>(); final RedissonPromise<Void> promise = new RedissonPromise<Void>();
RFuture<BatchResult<?>> res = executeAsync(0, 0, false, 0, 0, 0); RFuture<BatchResult<?>> res = executeAsync(0, 0, false, 0, 0, 0, false);
res.addListener(new FutureListener<BatchResult<?>>() { res.addListener(new FutureListener<BatchResult<?>>() {
@Override @Override
public void operationComplete(Future<BatchResult<?>> future) throws Exception { public void operationComplete(Future<BatchResult<?>> future) throws Exception {
@ -166,10 +166,10 @@ public class CommandBatchService extends CommandAsyncService {
} }
public RFuture<List<?>> executeAsync() { public RFuture<List<?>> executeAsync() {
return executeAsync(0, 0, false, 0, 0, 0); return executeAsync(0, 0, false, 0, 0, 0, false);
} }
public <R> RFuture<R> executeAsync(int syncSlaves, long syncTimeout, boolean skipResult, long responseTimeout, int retryAttempts, long retryInterval) { public <R> RFuture<R> executeAsync(int syncSlaves, long syncTimeout, boolean skipResult, long responseTimeout, int retryAttempts, long retryInterval, boolean atomic) {
if (executed) { if (executed) {
throw new IllegalStateException("Batch already executed!"); throw new IllegalStateException("Batch already executed!");
} }
@ -179,6 +179,15 @@ public class CommandBatchService extends CommandAsyncService {
} }
executed = true; executed = true;
if (atomic) {
for (Entry entry : commands.values()) {
BatchCommandData<?, ?> multiCommand = new BatchCommandData(RedisCommands.MULTI, new Object[] {}, index.incrementAndGet());
entry.getCommands().addFirst(multiCommand);
BatchCommandData<?, ?> execCommand = new BatchCommandData(RedisCommands.EXEC, new Object[] {}, index.incrementAndGet());
entry.getCommands().add(execCommand);
}
}
if (skipResult) { if (skipResult) {
for (Entry entry : commands.values()) { for (Entry entry : commands.values()) {
BatchCommandData<?, ?> offCommand = new BatchCommandData(RedisCommands.CLIENT_REPLY, new Object[] { "OFF" }, index.incrementAndGet()); BatchCommandData<?, ?> offCommand = new BatchCommandData(RedisCommands.CLIENT_REPLY, new Object[] { "OFF" }, index.incrementAndGet());
@ -226,12 +235,16 @@ public class CommandBatchService extends CommandAsyncService {
List<Object> responses = new ArrayList<Object>(entries.size()); List<Object> responses = new ArrayList<Object>(entries.size());
int syncedSlaves = 0; int syncedSlaves = 0;
for (BatchCommandData<?, ?> commandEntry : entries) { for (BatchCommandData<?, ?> commandEntry : entries) {
if (!isWaitCommand(commandEntry)) { if (!isWaitCommand(commandEntry)
&& !commandEntry.getCommand().getName().equals(RedisCommands.MULTI.getName())
&& !commandEntry.getCommand().getName().equals(RedisCommands.EXEC.getName())) {
Object entryResult = commandEntry.getPromise().getNow(); Object entryResult = commandEntry.getPromise().getNow();
entryResult = tryHandleReference(entryResult); entryResult = tryHandleReference(entryResult);
responses.add(entryResult); responses.add(entryResult);
} else { } else {
syncedSlaves = (Integer) commandEntry.getPromise().getNow(); if (isWaitCommand(commandEntry)) {
syncedSlaves = (Integer) commandEntry.getPromise().getNow();
}
} }
} }
@ -262,13 +275,13 @@ public class CommandBatchService extends CommandAsyncService {
} }
for (java.util.Map.Entry<MasterSlaveEntry, Entry> e : commands.entrySet()) { for (java.util.Map.Entry<MasterSlaveEntry, Entry> e : commands.entrySet()) {
execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0, skipResult, responseTimeout, retryAttempts, retryInterval); execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0, skipResult, responseTimeout, retryAttempts, retryInterval, atomic);
} }
return resultPromise; return resultPromise;
} }
private void execute(final Entry entry, final NodeSource source, final RPromise<Void> mainPromise, final AtomicInteger slots, private void execute(final Entry entry, final NodeSource source, final RPromise<Void> mainPromise, final AtomicInteger slots,
final int attempt, final boolean noResult, final long responseTimeout, final int retryAttempts, final long retryInterval) { final int attempt, final boolean noResult, final long responseTimeout, final int retryAttempts, final long retryInterval, final boolean atomic) {
if (mainPromise.isCancelled()) { if (mainPromise.isCancelled()) {
free(entry); free(entry);
return; return;
@ -364,7 +377,7 @@ public class CommandBatchService extends CommandAsyncService {
int count = attempt + 1; int count = attempt + 1;
mainPromise.removeListener(mainPromiseListener); mainPromise.removeListener(mainPromiseListener);
execute(entry, source, mainPromise, slots, count, noResult, responseTimeout, retryAttempts, retryInterval); execute(entry, source, mainPromise, slots, count, noResult, responseTimeout, retryAttempts, retryInterval, atomic);
} }
}; };
@ -380,7 +393,7 @@ public class CommandBatchService extends CommandAsyncService {
connectionFuture.addListener(new FutureListener<RedisConnection>() { connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override @Override
public void operationComplete(Future<RedisConnection> connFuture) throws Exception { public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
checkConnectionFuture(entry, source, mainPromise, attemptPromise, details, connectionFuture, noResult, responseTimeout, attempts); checkConnectionFuture(entry, source, mainPromise, attemptPromise, details, connectionFuture, noResult, responseTimeout, attempts, atomic);
} }
}); });
@ -398,19 +411,19 @@ public class CommandBatchService extends CommandAsyncService {
RedisMovedException ex = (RedisMovedException)future.cause(); RedisMovedException ex = (RedisMovedException)future.cause();
entry.clearErrors(); entry.clearErrors();
NodeSource nodeSource = new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.MOVED); NodeSource nodeSource = new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.MOVED);
execute(entry, nodeSource, mainPromise, slots, attempt, noResult, responseTimeout, retryAttempts, retryInterval); execute(entry, nodeSource, mainPromise, slots, attempt, noResult, responseTimeout, retryAttempts, retryInterval, atomic);
return; return;
} }
if (future.cause() instanceof RedisAskException) { if (future.cause() instanceof RedisAskException) {
RedisAskException ex = (RedisAskException)future.cause(); RedisAskException ex = (RedisAskException)future.cause();
entry.clearErrors(); entry.clearErrors();
NodeSource nodeSource = new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.ASK); NodeSource nodeSource = new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.ASK);
execute(entry, nodeSource, mainPromise, slots, attempt, noResult, responseTimeout, retryAttempts, retryInterval); execute(entry, nodeSource, mainPromise, slots, attempt, noResult, responseTimeout, retryAttempts, retryInterval, atomic);
return; return;
} }
if (future.cause() instanceof RedisLoadingException) { if (future.cause() instanceof RedisLoadingException) {
entry.clearErrors(); entry.clearErrors();
execute(entry, source, mainPromise, slots, attempt, noResult, responseTimeout, retryAttempts, retryInterval); execute(entry, source, mainPromise, slots, attempt, noResult, responseTimeout, retryAttempts, retryInterval, atomic);
return; return;
} }
if (future.cause() instanceof RedisTryAgainException) { if (future.cause() instanceof RedisTryAgainException) {
@ -418,7 +431,7 @@ public class CommandBatchService extends CommandAsyncService {
connectionManager.newTimeout(new TimerTask() { connectionManager.newTimeout(new TimerTask() {
@Override @Override
public void run(Timeout timeout) throws Exception { public void run(Timeout timeout) throws Exception {
execute(entry, source, mainPromise, slots, attempt, noResult, responseTimeout, retryAttempts, retryInterval); execute(entry, source, mainPromise, slots, attempt, noResult, responseTimeout, retryAttempts, retryInterval, atomic);
} }
}, 1, TimeUnit.SECONDS); }, 1, TimeUnit.SECONDS);
return; return;
@ -438,7 +451,7 @@ public class CommandBatchService extends CommandAsyncService {
} }
private void checkWriteFuture(Entry entry, final RPromise<Void> attemptPromise, AsyncDetails details, private void checkWriteFuture(Entry entry, final RPromise<Void> attemptPromise, AsyncDetails details,
final RedisConnection connection, ChannelFuture future, boolean noResult, long responseTimeout, int attempts) { final RedisConnection connection, ChannelFuture future, long responseTimeout, int attempts) {
if (future.isCancelled() || attemptPromise.isDone()) { if (future.isCancelled() || attemptPromise.isDone()) {
return; return;
} }
@ -471,7 +484,7 @@ public class CommandBatchService extends CommandAsyncService {
private void checkConnectionFuture(final Entry entry, final NodeSource source, private void checkConnectionFuture(final Entry entry, final NodeSource source,
final RPromise<Void> mainPromise, final RPromise<Void> attemptPromise, final AsyncDetails details, final RPromise<Void> mainPromise, final RPromise<Void> attemptPromise, final AsyncDetails details,
RFuture<RedisConnection> connFuture, final boolean noResult, final long responseTimeout, final int attempts) { RFuture<RedisConnection> connFuture, final boolean noResult, final long responseTimeout, final int attempts, boolean atomic) {
if (connFuture.isCancelled()) { if (connFuture.isCancelled()) {
return; return;
} }
@ -495,20 +508,20 @@ public class CommandBatchService extends CommandAsyncService {
list.add(new CommandData<Void, Void>(promise, StringCodec.INSTANCE, RedisCommands.ASKING, new Object[] {})); list.add(new CommandData<Void, Void>(promise, StringCodec.INSTANCE, RedisCommands.ASKING, new Object[] {}));
} }
for (BatchCommandData<?, ?> c : entry.getCommands()) { for (BatchCommandData<?, ?> c : entry.getCommands()) {
if (c.getPromise().isSuccess() && !isWaitCommand(c)) { if (c.getPromise().isSuccess() && !isWaitCommand(c) && !atomic) {
// skip successful commands // skip successful commands
continue; continue;
} }
list.add(c); list.add(c);
} }
ChannelFuture future = connection.send(new CommandsData(attemptPromise, list, noResult)); ChannelFuture future = connection.send(new CommandsData(attemptPromise, list, noResult, atomic));
details.setWriteFuture(future); details.setWriteFuture(future);
details.getWriteFuture().addListener(new ChannelFutureListener() { details.getWriteFuture().addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
checkWriteFuture(entry, attemptPromise, details, connection, future, noResult, responseTimeout, attempts); checkWriteFuture(entry, attemptPromise, details, connection, future, responseTimeout, attempts);
} }
}); });

@ -55,16 +55,16 @@ public class CommandReactiveBatchService extends CommandReactiveService {
@Override @Override
protected <V, R> void async(boolean readOnlyMode, NodeSource nodeSource, protected <V, R> void async(boolean readOnlyMode, NodeSource nodeSource,
Codec codec, RedisCommand<V> command, Object[] params, RPromise<R> mainPromise, int attempt) { Codec codec, RedisCommand<V> command, Object[] params, RPromise<R> mainPromise, int attempt, boolean ignoreRedirect) {
batchService.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt); batchService.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt, ignoreRedirect);
} }
public RFuture<BatchResult<?>> executeAsync(int syncSlaves, long syncTimeout, boolean skipResult, long responseTimeout, int retryAttempts, long retryInterval) { public RFuture<BatchResult<?>> executeAsync(int syncSlaves, long syncTimeout, boolean skipResult, long responseTimeout, int retryAttempts, long retryInterval, boolean atomic) {
for (Publisher<?> publisher : publishers) { for (Publisher<?> publisher : publishers) {
Flux.from(publisher).subscribe(); Flux.from(publisher).subscribe();
} }
return batchService.executeAsync(syncSlaves, syncTimeout, skipResult, responseTimeout, retryAttempts, retryInterval); return batchService.executeAsync(syncSlaves, syncTimeout, skipResult, responseTimeout, retryAttempts, retryInterval, atomic);
} }
@Override @Override

@ -64,6 +64,7 @@ public class RedissonBatchReactive implements RBatchReactive {
private int syncSlaves; private int syncSlaves;
private long syncTimeout; private long syncTimeout;
private boolean skipResult; private boolean skipResult;
private boolean atomic;
public RedissonBatchReactive(EvictionScheduler evictionScheduler, ConnectionManager connectionManager, CommandReactiveService commandExecutor) { public RedissonBatchReactive(EvictionScheduler evictionScheduler, ConnectionManager connectionManager, CommandReactiveService commandExecutor) {
this.evictionScheduler = evictionScheduler; this.evictionScheduler = evictionScheduler;
@ -221,11 +222,16 @@ public class RedissonBatchReactive implements RBatchReactive {
return commandExecutor.reactive(new Supplier<RFuture<BatchResult<?>>>() { return commandExecutor.reactive(new Supplier<RFuture<BatchResult<?>>>() {
@Override @Override
public RFuture<BatchResult<?>> get() { public RFuture<BatchResult<?>> get() {
return executorService.executeAsync(syncSlaves, syncTimeout, skipResult, timeout, retryAttempts, retryInterval); return executorService.executeAsync(syncSlaves, syncTimeout, skipResult, timeout, retryAttempts, retryInterval, atomic);
} }
}); });
} }
public RBatchReactive atomic() {
this.atomic = true;
return this;
}
@Override @Override
public RBatchReactive syncSlaves(int slaves, long timeout, TimeUnit unit) { public RBatchReactive syncSlaves(int slaves, long timeout, TimeUnit unit) {
this.syncSlaves = slaves; this.syncSlaves = slaves;

@ -9,6 +9,23 @@ import org.redisson.api.RAtomicDouble;
public class RedissonAtomicDoubleTest extends BaseTest { public class RedissonAtomicDoubleTest extends BaseTest {
@Test
public void testGetZero() {
RAtomicDouble ad2 = redisson.getAtomicDouble("test");
assertThat(ad2.get()).isZero();
}
@Test
public void testGetAndDelete() {
RAtomicDouble al = redisson.getAtomicDouble("test");
al.set(10.34);
assertThat(al.getAndDelete()).isEqualTo(10.34);
assertThat(al.isExists()).isFalse();
RAtomicDouble ad2 = redisson.getAtomicDouble("test2");
assertThat(ad2.getAndDelete()).isZero();
}
@Test @Test
public void testCompareAndSet() { public void testCompareAndSet() {
RAtomicDouble al = redisson.getAtomicDouble("test"); RAtomicDouble al = redisson.getAtomicDouble("test");

@ -1,11 +1,30 @@
package org.redisson; package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.redisson.api.RAtomicLong; import org.redisson.api.RAtomicLong;
public class RedissonAtomicLongTest extends BaseTest { public class RedissonAtomicLongTest extends BaseTest {
@Test
public void testGetZero() {
RAtomicLong ad2 = redisson.getAtomicLong("test");
assertThat(ad2.get()).isZero();
}
@Test
public void testGetAndDelete() {
RAtomicLong al = redisson.getAtomicLong("test");
al.set(10);
assertThat(al.getAndDelete()).isEqualTo(10);
assertThat(al.isExists()).isFalse();
RAtomicLong ad2 = redisson.getAtomicLong("test2");
assertThat(ad2.getAndDelete()).isZero();
}
@Test @Test
public void testCompareAndSetZero() { public void testCompareAndSetZero() {
RAtomicLong al = redisson.getAtomicLong("test"); RAtomicLong al = redisson.getAtomicLong("test");

@ -121,6 +121,63 @@ public class RedissonBatchTest extends BaseTest {
batch.execute(); batch.execute();
} }
@Test
public void testAtomic() {
RBatch batch = redisson.createBatch();
batch.atomic();
RFuture<Long> f1 = batch.getAtomicLong("A1").addAndGetAsync(1);
RFuture<Long> f2 = batch.getAtomicLong("A2").addAndGetAsync(2);
RFuture<Long> f3 = batch.getAtomicLong("A3").addAndGetAsync(3);
RFuture<Long> d1 = batch.getKeys().deleteAsync("A1", "A2");
BatchResult<?> f = batch.execute();
List<Object> list = (List<Object>) f.getResponses();
assertThat(list).containsExactly(1L, 2L, 3L, 2L);
assertThat(f1.getNow()).isEqualTo(1);
assertThat(f2.getNow()).isEqualTo(2);
assertThat(f3.getNow()).isEqualTo(3);
assertThat(d1.getNow()).isEqualTo(2);
}
@Test
public void testAtomicSyncSlaves() throws FailedToStartRedisException, IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterProcesses process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RBatch batch = redisson.createBatch();
for (int i = 0; i < 10; i++) {
batch.getAtomicLong("{test}" + i).addAndGetAsync(i);
}
batch.atomic();
batch.syncSlaves(1, 1, TimeUnit.SECONDS);
BatchResult<?> result = batch.execute();
assertThat(result.getSyncedSlaves()).isEqualTo(1);
int i = 0;
for (Object res : result.getResponses()) {
assertThat((Long)res).isEqualTo(i++);
}
process.shutdown();
}
@Test @Test
public void testDifferentCodecs() { public void testDifferentCodecs() {
RBatch b = redisson.createBatch(); RBatch b = redisson.createBatch();

@ -12,6 +12,15 @@ import org.redisson.api.RBucket;
public class RedissonBucketTest extends BaseTest { public class RedissonBucketTest extends BaseTest {
@Test
public void testGetAndDelete() {
RBucket<Integer> al = redisson.getBucket("test");
al.set(10);
assertThat(al.getAndDelete()).isEqualTo(10);
assertThat(al.isExists()).isFalse();
assertThat(al.getAndDelete()).isNull();
}
@Test @Test
public void testSize() { public void testSize() {
RBucket<String> bucket = redisson.getBucket("testCompareAndSet"); RBucket<String> bucket = redisson.getBucket("testCompareAndSet");

@ -1,177 +0,0 @@
package org.redisson;
import java.security.SecureRandom;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import org.junit.Assert;
import org.junit.Test;
public class RedissonConcurrentMapTest extends BaseConcurrentTest {
@Test
public void testSingleReplaceOldValue_SingleInstance() throws InterruptedException {
final String name = "testSingleReplaceOldValue_SingleInstance";
ConcurrentMap<String, String> map = BaseTest.createInstance().getMap(name);
map.put("1", "122");
testSingleInstanceConcurrency(100, r -> {
ConcurrentMap<String, String> map1 = r.getMap(name);
map1.replace("1", "122", "32");
map1.replace("1", "0", "31");
});
ConcurrentMap<String, String> testMap = BaseTest.createInstance().getMap(name);
Assert.assertEquals("32", testMap.get("1"));
assertMapSize(1, name);
}
@Test
public void testSingleRemoveValue_SingleInstance() throws InterruptedException {
final String name = "testSingleRemoveValue_SingleInstance";
ConcurrentMap<String, String> map = BaseTest.createInstance().getMap(name);
map.putIfAbsent("1", "0");
testSingleInstanceConcurrency(100, r -> {
ConcurrentMap<String, String> map1 = r.getMap(name);
map1.remove("1", "0");
});
assertMapSize(0, name);
}
@Test
public void testSingleReplace_SingleInstance() throws InterruptedException {
final String name = "testSingleReplace_SingleInstance";
ConcurrentMap<String, String> map = BaseTest.createInstance().getMap(name);
map.put("1", "0");
testSingleInstanceConcurrency(100, r -> {
ConcurrentMap<String, String> map1 = r.getMap(name);
map1.replace("1", "3");
});
ConcurrentMap<String, String> testMap = BaseTest.createInstance().getMap(name);
Assert.assertEquals("3", testMap.get("1"));
assertMapSize(1, name);
}
@Test
public void test_Multi_Replace_MultiInstance() throws InterruptedException {
final String name = "test_Multi_Replace_MultiInstance";
ConcurrentMap<Integer, Integer> map = BaseTest.createInstance().getMap(name);
for (int i = 0; i < 5; i++) {
map.put(i, 1);
}
final SecureRandom secureRandom = new SecureRandom();
testSingleInstanceConcurrency(100, r -> {
ConcurrentMap<Integer, Integer> map1 = r.getMap(name);
Assert.assertNotNull(map1.replace(secureRandom.nextInt(5), 2));
});
ConcurrentMap<Integer, Integer> testMap = BaseTest.createInstance().getMap(name);
for (Integer value : testMap.values()) {
Assert.assertEquals(2, (int)value);
}
assertMapSize(5, name);
}
@Test
public void test_Multi_RemoveValue_MultiInstance() throws InterruptedException {
final String name = "test_Multi_RemoveValue_MultiInstance";
ConcurrentMap<Integer, Integer> map = BaseTest.createInstance().getMap(name);
for (int i = 0; i < 10; i++) {
map.put(i, 1);
}
final SecureRandom secureRandom = new SecureRandom();
testMultiInstanceConcurrency(100, r -> {
ConcurrentMap<String, String> map1 = r.getMap(name);
map1.remove(secureRandom.nextInt(10), 1);
});
assertMapSize(0, name);
}
@Test
public void testSinglePutIfAbsent_SingleInstance() throws InterruptedException {
final String name = "testSinglePutIfAbsent_SingleInstance";
ConcurrentMap<String, String> map = BaseTest.createInstance().getMap(name);
map.putIfAbsent("1", "0");
testSingleInstanceConcurrency(100, r -> {
ConcurrentMap<String, String> map1 = r.getMap(name);
map1.putIfAbsent("1", "1");
});
ConcurrentMap<String, String> testMap = BaseTest.createInstance().getMap(name);
Assert.assertEquals("0", testMap.get("1"));
assertMapSize(1, name);
}
@Test
public void testMultiPutIfAbsent_SingleInstance() throws InterruptedException {
final String name = "testMultiPutIfAbsent_SingleInstance";
testSingleInstanceConcurrency(100, r -> {
ConcurrentMap<String, String> map = r.getMap(name);
map.putIfAbsent("" + Math.random(), "1");
});
assertMapSize(100, name);
}
@Test
public void testMultiPutIfAbsent_MultiInstance() throws InterruptedException {
final String name = "testMultiPutIfAbsent_MultiInstance";
testMultiInstanceConcurrency(100, r -> {
ConcurrentMap<String, String> map = r.getMap(name);
map.putIfAbsent("" + Math.random(), "1");
});
assertMapSize(100, name);
}
private void assertMapSize(int size, String name) {
Map<String, String> map = BaseTest.createInstance().getMap(name);
Assert.assertEquals(size, map.size());
clear(map);
}
@Test
public void testMultiPut_SingleInstance() throws InterruptedException {
final String name = "testMultiPut_SingleInstance";
testSingleInstanceConcurrency(100, r -> {
Map<String, String> map = r.getMap(name);
map.put("" + Math.random(), "1");
});
assertMapSize(100, name);
}
@Test
public void testMultiPut_MultiInstance() throws InterruptedException {
final String name = "testMultiPut_MultiInstance";
testMultiInstanceConcurrency(100, r -> {
ConcurrentMap<String, String> map = r.getMap(name);
map.putIfAbsent("" + Math.random(), "1");
});
assertMapSize(100, name);
}
private void clear(Map<?, ?> map) {
map.clear();
Assert.assertEquals(0, map.size());
}
}

@ -126,24 +126,18 @@ public class RedissonLocalCachedMapTest extends BaseMapTest {
return redisson.getLocalCachedMap("test", options); return redisson.getLocalCachedMap("test", options);
} }
// @Test @Test
public void testBigData() throws InterruptedException { public void testBigPutAll() throws InterruptedException {
RLocalCachedMap<Object, Object> m = redisson.getLocalCachedMap("testValuesWithNearCache2", RLocalCachedMap<Object, Object> m = redisson.getLocalCachedMap("testValuesWithNearCache2",
LocalCachedMapOptions.defaults().evictionPolicy(EvictionPolicy.LFU)); LocalCachedMapOptions.defaults().evictionPolicy(EvictionPolicy.LFU).syncStrategy(SyncStrategy.INVALIDATE));
for (int i = 0; i < 100; i++) { Map<Object, Object> map = new HashMap<>();
for (int k = 0; k < 1000; k++) { for (int k = 0; k < 10000; k++) {
Map<Object, Object> map = new HashMap<>(); map.put("" + k, "" + k);
map.put("" + k * i, "" + k * i);
m.putAll(map);
}
System.out.println(i);
} }
m.putAll(map);
System.out.println("done"); assertThat(m.size()).isEqualTo(10000);
Thread.sleep(1000000);
} }

@ -1,10 +1,20 @@
package org.redisson; package org.redisson;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.awaitility.Awaitility.await;
import java.security.SecureRandom; import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -18,10 +28,38 @@ import org.redisson.api.RReadWriteLock;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.redisson.config.Config; import org.redisson.config.Config;
import static org.awaitility.Awaitility.*;
public class RedissonReadWriteLockTest extends BaseConcurrentTest { public class RedissonReadWriteLockTest extends BaseConcurrentTest {
@Test
public void testName() throws InterruptedException, ExecutionException, TimeoutException {
ExecutorService service = Executors.newFixedThreadPool(10);
RReadWriteLock rwlock = redisson.getReadWriteLock("{test}:abc:key");
RLock rlock = rwlock.readLock();
List<Callable<Void>> callables = new ArrayList<>();
for (int i = 0; i < 10; i++) {
callables.add(() -> {
for (int j = 0; j < 10; j++) {
rlock.lock();
try {
} finally {
rlock.unlock();
}
}
return null;
});
}
List<Future<Void>> futures = service.invokeAll(callables);
for (Future<Void> future : futures) {
assertThatCode(future::get).doesNotThrowAnyException();
}
service.shutdown();
assertThat(service.awaitTermination(1, TimeUnit.MINUTES)).isTrue();
}
@Test @Test
public void testWriteLockExpiration() throws InterruptedException { public void testWriteLockExpiration() throws InterruptedException {
RReadWriteLock rw1 = redisson.getReadWriteLock("test2s3"); RReadWriteLock rw1 = redisson.getReadWriteLock("test2s3");

@ -5,16 +5,51 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RLexSortedSet;
import org.redisson.api.RScript; import org.redisson.api.RScript;
import org.redisson.api.RScript.Mode; import org.redisson.api.RScript.Mode;
import org.redisson.client.RedisException; import org.redisson.client.RedisException;
import org.redisson.client.codec.StringCodec;
public class RedissonScriptTest extends BaseTest { public class RedissonScriptTest extends BaseTest {
@Test
public void testMulti() throws InterruptedException, ExecutionException {
RLexSortedSet idx2 = redisson.getLexSortedSet("ABCD17436");
Long l = new Long("1506524856000");
for (int i = 0; i < 100; i++) {
String s = "DENY" + "\t" + "TESTREDISSON" + "\t"
+ Long.valueOf(l) + "\t" + "helloworld_hongqin";
idx2.add(s);
l = l + 1;
}
StringCodec codec = new StringCodec();
String max = "'[DENY" + "\t" + "TESTREDISSON" + "\t" + "1506524856099'";
String min = "'[DENY" + "\t" + "TESTREDISSON" + "\t" + "1506524856000'";
String luaScript1= "local d = {}; d[1] = redis.call('zrevrangebylex','ABCD17436'," +max+","+min+",'LIMIT',0,5); ";
luaScript1= luaScript1 + " d[2] = redis.call('zrevrangebylex','ABCD17436'," +max+","+min+",'LIMIT',0,15); ";
luaScript1= luaScript1 + " d[3] = redis.call('zrevrangebylex','ABCD17436'," +max+","+min+",'LIMIT',0,25); ";
luaScript1 = luaScript1 + " return d;";
Future<Object> r1 = redisson.getScript().evalAsync(RScript.Mode.READ_ONLY, codec,
luaScript1,
RScript.ReturnType.MULTI, Collections.emptyList());
List<List<Object>> obj1 = (List<List<Object>>) r1.get();
assertThat(obj1).hasSize(3);
assertThat(obj1.get(0)).hasSize(5);
assertThat(obj1.get(1)).hasSize(15);
assertThat(obj1.get(2)).hasSize(25);
}
@Test @Test
public void testEval() { public void testEval() {
RScript script = redisson.getScript(); RScript script = redisson.getScript();

@ -255,43 +255,4 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest {
assertThat(lockedCounter.get()).isEqualTo(iterations); assertThat(lockedCounter.get()).isEqualTo(iterations);
} }
@Test
public void testConcurrency_MultiInstance_10_permits() throws InterruptedException {
Assume.assumeFalse(RedissonRuntimeEnvironment.isTravis);
int iterations = 100;
final AtomicInteger lockedCounter = new AtomicInteger();
RSemaphore s = redisson.getSemaphore("test");
s.trySetPermits(10);
final AtomicInteger checkPermits = new AtomicInteger(s.availablePermits());
final CyclicBarrier barrier = new CyclicBarrier(s.availablePermits());
testMultiInstanceConcurrencySequentiallyLaunched(iterations, r -> {
RSemaphore s1 = r.getSemaphore("test");
try {
s1.acquire();
barrier.await();
if (checkPermits.decrementAndGet() > 0) {
assertThat(s1.availablePermits()).isEqualTo(0);
assertThat(s1.tryAcquire()).isFalse();
} else {
Thread.sleep(50);
}
}catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
int value = lockedCounter.get();
lockedCounter.set(value + 1);
s1.release();
});
System.out.println(lockedCounter.get());
assertThat(lockedCounter.get()).isLessThan(iterations);
}
} }

Loading…
Cancel
Save