RBucket.compareAndSet and RBucket.getAndSet methods added. #348

pull/365/head
Nikita 9 years ago
parent 2fc7d7cf93
commit aaea0e6984

@ -15,10 +15,11 @@
*/
package org.redisson;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.core.RBucket;
@ -27,6 +28,8 @@ import io.netty.util.concurrent.Future;
public class RedissonBucket<V> extends RedissonExpirable implements RBucket<V> {
private static final RedisCommand<Object> EVAL_GETSET = new RedisCommand<Object>("EVAL", 4);
protected RedissonBucket(CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name);
}
@ -35,6 +38,73 @@ public class RedissonBucket<V> extends RedissonExpirable implements RBucket<V> {
super(codec, connectionManager, name);
}
@Override
public boolean compareAndSet(V expect, V update) {
return get(compareAndSetAsync(expect, update));
}
@Override
public Future<Boolean> compareAndSetAsync(V expect, V update) {
if (expect == null && update == null) {
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
"return redis.call('exists', KEYS[1]) == 0 then "
+ "return 1 "
+ "else "
+ "return 0 end",
Collections.<Object>singletonList(getName()));
}
if (expect == null) {
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES,
"if redis.call('exists', KEYS[1]) == 0 then "
+ "redis.call('set', KEYS[1], ARGV[1]); "
+ "return 1 "
+ "else "
+ "return 0 end",
Collections.<Object>singletonList(getName()), update);
}
if (update == null) {
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES,
"if redis.call('get', KEYS[1]) == ARGV[1] then "
+ "redis.call('del', KEYS[1]); "
+ "return 1 "
+ "else "
+ "return 0 end",
Collections.<Object>singletonList(getName()), expect);
}
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES,
"if redis.call('get', KEYS[1]) == ARGV[1] then "
+ "redis.call('set', KEYS[1], ARGV[2]); "
+ "return 1 "
+ "else "
+ "return 0 end",
Collections.<Object>singletonList(getName()), expect, update);
}
@Override
public V getAndSet(V newValue) {
return get(getAndSetAsync(newValue));
}
@Override
public Future<V> getAndSetAsync(V newValue) {
if (newValue == null) {
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_OBJECT,
"local v = redis.call('get', KEYS[1]); "
+ "redis.call('del', KEYS[1]); "
+ "return v",
Collections.<Object>singletonList(getName()));
}
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_GETSET,
"local v = redis.call('get', KEYS[1]); "
+ "redis.call('set', KEYS[1], ARGV[1]); "
+ "return v",
Collections.<Object>singletonList(getName()), newValue);
}
@Override
public V get() {
return get(getAsync());
@ -52,6 +122,10 @@ public class RedissonBucket<V> extends RedissonExpirable implements RBucket<V> {
@Override
public Future<Void> setAsync(V value) {
if (value == null) {
return commandExecutor.writeAsync(getName(), RedisCommands.DEL_VOID, getName());
}
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SET, getName(), value);
}
@ -62,6 +136,10 @@ public class RedissonBucket<V> extends RedissonExpirable implements RBucket<V> {
@Override
public Future<Void> setAsync(V value, long timeToLive, TimeUnit timeUnit) {
if (value == null) {
throw new IllegalArgumentException("Value can't be null");
}
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SETEX, getName(), timeUnit.toSeconds(timeToLive), value);
}
@ -87,17 +165,19 @@ public class RedissonBucket<V> extends RedissonExpirable implements RBucket<V> {
@Override
public Future<Boolean> trySetAsync(V value) {
if (value == null) {
return commandExecutor.readAsync(getName(), codec, RedisCommands.NOT_EXISTS, getName());
}
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SETNX, getName(), value);
}
@Override
public Future<Boolean> trySetAsync(V value, long timeToLive, TimeUnit timeUnit) {
try {
byte[] state = codec.getValueEncoder().encode(value);
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SETPXNX, getName(), state, "PX", timeUnit.toMillis(timeToLive), "NX");
} catch (IOException e) {
throw new IllegalArgumentException(e);
if (value == null) {
throw new IllegalArgumentException("Value can't be null");
}
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SETPXNX, getName(), value, "PX", timeUnit.toMillis(timeToLive), "NX");
}
@Override

@ -358,7 +358,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
@Override
public Future<Boolean> containsAsync(Object o) {
return indexOfAsync(o, new BooleanNumberReplayConvertor());
return indexOfAsync(o, new BooleanNumberReplayConvertor(-1L));
}
private <R> Future<R> indexOfAsync(Object o, Convertor<R> convertor) {

@ -24,6 +24,7 @@ import org.redisson.client.protocol.convertor.BitSetReplayConvertor;
import org.redisson.client.protocol.convertor.BitsSizeReplayConvertor;
import org.redisson.client.protocol.convertor.BooleanAmountReplayConvertor;
import org.redisson.client.protocol.convertor.BooleanNotNullReplayConvertor;
import org.redisson.client.protocol.convertor.BooleanNumberReplayConvertor;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.convertor.DoubleReplayConvertor;
import org.redisson.client.protocol.convertor.IntegerReplayConvertor;
@ -195,10 +196,11 @@ public interface RedisCommands {
RedisCommand<Object> GET = new RedisCommand<Object>("GET");
RedisCommand<Void> SET = new RedisCommand<Void>("SET", new VoidReplayConvertor(), 2);
RedisCommand<Boolean> SETPXNX = new RedisCommand<Boolean>("SET", new BooleanNotNullReplayConvertor());
RedisCommand<Boolean> SETPXNX = new RedisCommand<Boolean>("SET", new BooleanNotNullReplayConvertor(), 2);
RedisCommand<Boolean> SETNX = new RedisCommand<Boolean>("SETNX", new BooleanReplayConvertor(), 2);
RedisCommand<Void> SETEX = new RedisCommand<Void>("SETEX", new VoidReplayConvertor(), 3);
RedisStrictCommand<Boolean> EXISTS = new RedisStrictCommand<Boolean>("EXISTS", new BooleanReplayConvertor());
RedisStrictCommand<Boolean> NOT_EXISTS = new RedisStrictCommand<Boolean>("EXISTS", new BooleanNumberReplayConvertor(1L));
RedisStrictCommand<Boolean> RENAMENX = new RedisStrictCommand<Boolean>("RENAMENX", new BooleanReplayConvertor());
RedisStrictCommand<Void> RENAME = new RedisStrictCommand<Void>("RENAME", new VoidReplayConvertor());

@ -17,9 +17,16 @@ package org.redisson.client.protocol.convertor;
public class BooleanNumberReplayConvertor extends SingleConvertor<Boolean> {
private long number;
public BooleanNumberReplayConvertor(long number) {
super();
this.number = number;
}
@Override
public Boolean convert(Object obj) {
return (Long)obj != -1;
return (Long)obj != number;
}

@ -63,7 +63,7 @@ import io.netty.util.concurrent.Promise;
*/
public class CommandAsyncService implements CommandAsyncExecutor {
final Logger log = LoggerFactory.getLogger(getClass());
private static final Logger log = LoggerFactory.getLogger(CommandAsyncService.class);
final ConnectionManager connectionManager;

@ -32,6 +32,10 @@ public interface RBucket<V> extends RExpirable, RBucketAsync<V> {
boolean trySet(V value, long timeToLive, TimeUnit timeUnit);
boolean compareAndSet(V expect, V update);
V getAndSet(V newValue);
void set(V value);
void set(V value, long timeToLive, TimeUnit timeUnit);

@ -34,6 +34,10 @@ public interface RBucketAsync<V> extends RExpirableAsync {
Future<Boolean> trySetAsync(V value, long timeToLive, TimeUnit timeUnit);
Future<Boolean> compareAndSetAsync(V expect, V update);
Future<V> getAndSetAsync(V newValue);
Future<Void> setAsync(V value);
Future<Void> setAsync(V value, long timeToLive, TimeUnit timeUnit);

@ -309,7 +309,7 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
@Override
public Publisher<Boolean> contains(Object o) {
return indexOf(o, new BooleanNumberReplayConvertor());
return indexOf(o, new BooleanNumberReplayConvertor(-1L));
}
private <R> Publisher<R> indexOf(Object o, Convertor<R> convertor) {

@ -1,5 +1,7 @@
package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
@ -10,13 +12,41 @@ import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.core.RBucket;
import static org.assertj.core.api.Assertions.*;
public class RedissonBucketTest extends BaseTest {
@Test
public void testCompareAndSet() {
RBucket<List<String>> r1 = redisson.getBucket("testCompareAndSet");
assertThat(r1.compareAndSet(null, Arrays.asList("81"))).isTrue();
assertThat(r1.compareAndSet(null, Arrays.asList("12"))).isFalse();
assertThat(r1.compareAndSet(Arrays.asList("81"), Arrays.asList("0"))).isTrue();
assertThat(r1.get()).isEqualTo(Arrays.asList("0"));
assertThat(r1.compareAndSet(Arrays.asList("1"), Arrays.asList("2"))).isFalse();
assertThat(r1.get()).isEqualTo(Arrays.asList("0"));
assertThat(r1.compareAndSet(Arrays.asList("0"), null)).isTrue();
assertThat(r1.get()).isNull();
assertThat(r1.isExists()).isFalse();
}
@Test
public void testGetAndSet() {
RBucket<List<String>> r1 = redisson.getBucket("testGetAndSet");
assertThat(r1.getAndSet(Arrays.asList("81"))).isNull();
assertThat(r1.getAndSet(Arrays.asList("1"))).isEqualTo(Arrays.asList("81"));
assertThat(r1.get()).isEqualTo(Arrays.asList("1"));
assertThat(r1.getAndSet(null)).isEqualTo(Arrays.asList("1"));
assertThat(r1.get()).isNull();
assertThat(r1.isExists()).isFalse();
}
@Test
public void testTrySet() {
RBucket<String> r1 = redisson.getBucket("12");
RBucket<String> r1 = redisson.getBucket("testTrySet");
assertThat(r1.trySet("3")).isTrue();
assertThat(r1.trySet("4")).isFalse();
assertThat(r1.get()).isEqualTo("3");
@ -24,7 +54,7 @@ public class RedissonBucketTest extends BaseTest {
@Test
public void testTrySetTTL() throws InterruptedException {
RBucket<String> r1 = redisson.getBucket("12");
RBucket<String> r1 = redisson.getBucket("testTrySetTTL");
assertThat(r1.trySet("3", 500, TimeUnit.MILLISECONDS)).isTrue();
assertThat(r1.trySet("4", 500, TimeUnit.MILLISECONDS)).isFalse();
assertThat(r1.get()).isEqualTo("3");

Loading…
Cancel
Save