diff --git a/src/main/java/org/redisson/RedissonBucket.java b/src/main/java/org/redisson/RedissonBucket.java index 676faf2f8..0fc887c6e 100644 --- a/src/main/java/org/redisson/RedissonBucket.java +++ b/src/main/java/org/redisson/RedissonBucket.java @@ -15,10 +15,11 @@ */ package org.redisson; -import java.io.IOException; +import java.util.Collections; import java.util.concurrent.TimeUnit; import org.redisson.client.codec.Codec; +import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; import org.redisson.core.RBucket; @@ -27,6 +28,8 @@ import io.netty.util.concurrent.Future; public class RedissonBucket extends RedissonExpirable implements RBucket { + private static final RedisCommand EVAL_GETSET = new RedisCommand("EVAL", 4); + protected RedissonBucket(CommandAsyncExecutor connectionManager, String name) { super(connectionManager, name); } @@ -35,6 +38,73 @@ public class RedissonBucket extends RedissonExpirable implements RBucket { super(codec, connectionManager, name); } + @Override + public boolean compareAndSet(V expect, V update) { + return get(compareAndSetAsync(expect, update)); + } + + @Override + public Future compareAndSetAsync(V expect, V update) { + if (expect == null && update == null) { + return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN, + "return redis.call('exists', KEYS[1]) == 0 then " + + "return 1 " + + "else " + + "return 0 end", + Collections.singletonList(getName())); + } + + if (expect == null) { + return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES, + "if redis.call('exists', KEYS[1]) == 0 then " + + "redis.call('set', KEYS[1], ARGV[1]); " + + "return 1 " + + "else " + + "return 0 end", + Collections.singletonList(getName()), update); + } + + if (update == null) { + return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES, + "if redis.call('get', KEYS[1]) == ARGV[1] then " + + "redis.call('del', KEYS[1]); " + + "return 1 " + + "else " + + "return 0 end", + Collections.singletonList(getName()), expect); + } + + return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES, + "if redis.call('get', KEYS[1]) == ARGV[1] then " + + "redis.call('set', KEYS[1], ARGV[2]); " + + "return 1 " + + "else " + + "return 0 end", + Collections.singletonList(getName()), expect, update); + } + + @Override + public V getAndSet(V newValue) { + return get(getAndSetAsync(newValue)); + } + + @Override + public Future getAndSetAsync(V newValue) { + if (newValue == null) { + return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_OBJECT, + "local v = redis.call('get', KEYS[1]); " + + "redis.call('del', KEYS[1]); " + + "return v", + Collections.singletonList(getName())); + } + + return commandExecutor.evalWriteAsync(getName(), codec, EVAL_GETSET, + "local v = redis.call('get', KEYS[1]); " + + "redis.call('set', KEYS[1], ARGV[1]); " + + "return v", + Collections.singletonList(getName()), newValue); + } + @Override public V get() { return get(getAsync()); @@ -52,6 +122,10 @@ public class RedissonBucket extends RedissonExpirable implements RBucket { @Override public Future setAsync(V value) { + if (value == null) { + return commandExecutor.writeAsync(getName(), RedisCommands.DEL_VOID, getName()); + } + return commandExecutor.writeAsync(getName(), codec, RedisCommands.SET, getName(), value); } @@ -62,6 +136,10 @@ public class RedissonBucket extends RedissonExpirable implements RBucket { @Override public Future setAsync(V value, long timeToLive, TimeUnit timeUnit) { + if (value == null) { + throw new IllegalArgumentException("Value can't be null"); + } + return commandExecutor.writeAsync(getName(), codec, RedisCommands.SETEX, getName(), timeUnit.toSeconds(timeToLive), value); } @@ -87,17 +165,19 @@ public class RedissonBucket extends RedissonExpirable implements RBucket { @Override public Future trySetAsync(V value) { + if (value == null) { + return commandExecutor.readAsync(getName(), codec, RedisCommands.NOT_EXISTS, getName()); + } + return commandExecutor.writeAsync(getName(), codec, RedisCommands.SETNX, getName(), value); } @Override public Future trySetAsync(V value, long timeToLive, TimeUnit timeUnit) { - try { - byte[] state = codec.getValueEncoder().encode(value); - return commandExecutor.writeAsync(getName(), codec, RedisCommands.SETPXNX, getName(), state, "PX", timeUnit.toMillis(timeToLive), "NX"); - } catch (IOException e) { - throw new IllegalArgumentException(e); + if (value == null) { + throw new IllegalArgumentException("Value can't be null"); } + return commandExecutor.writeAsync(getName(), codec, RedisCommands.SETPXNX, getName(), value, "PX", timeUnit.toMillis(timeToLive), "NX"); } @Override diff --git a/src/main/java/org/redisson/RedissonList.java b/src/main/java/org/redisson/RedissonList.java index 492f5876d..33ca451fa 100644 --- a/src/main/java/org/redisson/RedissonList.java +++ b/src/main/java/org/redisson/RedissonList.java @@ -358,7 +358,7 @@ public class RedissonList extends RedissonExpirable implements RList { @Override public Future containsAsync(Object o) { - return indexOfAsync(o, new BooleanNumberReplayConvertor()); + return indexOfAsync(o, new BooleanNumberReplayConvertor(-1L)); } private Future indexOfAsync(Object o, Convertor convertor) { diff --git a/src/main/java/org/redisson/client/protocol/RedisCommands.java b/src/main/java/org/redisson/client/protocol/RedisCommands.java index 4d65fc135..bed864786 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -24,6 +24,7 @@ import org.redisson.client.protocol.convertor.BitSetReplayConvertor; import org.redisson.client.protocol.convertor.BitsSizeReplayConvertor; import org.redisson.client.protocol.convertor.BooleanAmountReplayConvertor; import org.redisson.client.protocol.convertor.BooleanNotNullReplayConvertor; +import org.redisson.client.protocol.convertor.BooleanNumberReplayConvertor; import org.redisson.client.protocol.convertor.BooleanReplayConvertor; import org.redisson.client.protocol.convertor.DoubleReplayConvertor; import org.redisson.client.protocol.convertor.IntegerReplayConvertor; @@ -195,10 +196,11 @@ public interface RedisCommands { RedisCommand GET = new RedisCommand("GET"); RedisCommand SET = new RedisCommand("SET", new VoidReplayConvertor(), 2); - RedisCommand SETPXNX = new RedisCommand("SET", new BooleanNotNullReplayConvertor()); + RedisCommand SETPXNX = new RedisCommand("SET", new BooleanNotNullReplayConvertor(), 2); RedisCommand SETNX = new RedisCommand("SETNX", new BooleanReplayConvertor(), 2); RedisCommand SETEX = new RedisCommand("SETEX", new VoidReplayConvertor(), 3); RedisStrictCommand EXISTS = new RedisStrictCommand("EXISTS", new BooleanReplayConvertor()); + RedisStrictCommand NOT_EXISTS = new RedisStrictCommand("EXISTS", new BooleanNumberReplayConvertor(1L)); RedisStrictCommand RENAMENX = new RedisStrictCommand("RENAMENX", new BooleanReplayConvertor()); RedisStrictCommand RENAME = new RedisStrictCommand("RENAME", new VoidReplayConvertor()); diff --git a/src/main/java/org/redisson/client/protocol/convertor/BooleanNumberReplayConvertor.java b/src/main/java/org/redisson/client/protocol/convertor/BooleanNumberReplayConvertor.java index 916673e05..6169ed902 100644 --- a/src/main/java/org/redisson/client/protocol/convertor/BooleanNumberReplayConvertor.java +++ b/src/main/java/org/redisson/client/protocol/convertor/BooleanNumberReplayConvertor.java @@ -17,9 +17,16 @@ package org.redisson.client.protocol.convertor; public class BooleanNumberReplayConvertor extends SingleConvertor { + private long number; + + public BooleanNumberReplayConvertor(long number) { + super(); + this.number = number; + } + @Override public Boolean convert(Object obj) { - return (Long)obj != -1; + return (Long)obj != number; } diff --git a/src/main/java/org/redisson/command/CommandAsyncService.java b/src/main/java/org/redisson/command/CommandAsyncService.java index e36de9c8c..21108529c 100644 --- a/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/src/main/java/org/redisson/command/CommandAsyncService.java @@ -63,7 +63,7 @@ import io.netty.util.concurrent.Promise; */ public class CommandAsyncService implements CommandAsyncExecutor { - final Logger log = LoggerFactory.getLogger(getClass()); + private static final Logger log = LoggerFactory.getLogger(CommandAsyncService.class); final ConnectionManager connectionManager; diff --git a/src/main/java/org/redisson/core/RBucket.java b/src/main/java/org/redisson/core/RBucket.java index a21f9f51a..3f57eae1c 100644 --- a/src/main/java/org/redisson/core/RBucket.java +++ b/src/main/java/org/redisson/core/RBucket.java @@ -32,6 +32,10 @@ public interface RBucket extends RExpirable, RBucketAsync { boolean trySet(V value, long timeToLive, TimeUnit timeUnit); + boolean compareAndSet(V expect, V update); + + V getAndSet(V newValue); + void set(V value); void set(V value, long timeToLive, TimeUnit timeUnit); diff --git a/src/main/java/org/redisson/core/RBucketAsync.java b/src/main/java/org/redisson/core/RBucketAsync.java index 9dd6073db..f834089e2 100644 --- a/src/main/java/org/redisson/core/RBucketAsync.java +++ b/src/main/java/org/redisson/core/RBucketAsync.java @@ -34,6 +34,10 @@ public interface RBucketAsync extends RExpirableAsync { Future trySetAsync(V value, long timeToLive, TimeUnit timeUnit); + Future compareAndSetAsync(V expect, V update); + + Future getAndSetAsync(V newValue); + Future setAsync(V value); Future setAsync(V value, long timeToLive, TimeUnit timeUnit); diff --git a/src/main/java/org/redisson/reactive/RedissonListReactive.java b/src/main/java/org/redisson/reactive/RedissonListReactive.java index c58c515b8..008b5ade8 100644 --- a/src/main/java/org/redisson/reactive/RedissonListReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonListReactive.java @@ -309,7 +309,7 @@ public class RedissonListReactive extends RedissonExpirableReactive implement @Override public Publisher contains(Object o) { - return indexOf(o, new BooleanNumberReplayConvertor()); + return indexOf(o, new BooleanNumberReplayConvertor(-1L)); } private Publisher indexOf(Object o, Convertor convertor) { diff --git a/src/test/java/org/redisson/RedissonBucketTest.java b/src/test/java/org/redisson/RedissonBucketTest.java index 2d02df591..54404a704 100755 --- a/src/test/java/org/redisson/RedissonBucketTest.java +++ b/src/test/java/org/redisson/RedissonBucketTest.java @@ -1,5 +1,7 @@ package org.redisson; +import static org.assertj.core.api.Assertions.assertThat; + import java.util.Arrays; import java.util.Collection; import java.util.HashMap; @@ -10,13 +12,41 @@ import java.util.concurrent.TimeUnit; import org.junit.Assert; import org.junit.Test; import org.redisson.core.RBucket; -import static org.assertj.core.api.Assertions.*; public class RedissonBucketTest extends BaseTest { + @Test + public void testCompareAndSet() { + RBucket> r1 = redisson.getBucket("testCompareAndSet"); + assertThat(r1.compareAndSet(null, Arrays.asList("81"))).isTrue(); + assertThat(r1.compareAndSet(null, Arrays.asList("12"))).isFalse(); + + assertThat(r1.compareAndSet(Arrays.asList("81"), Arrays.asList("0"))).isTrue(); + assertThat(r1.get()).isEqualTo(Arrays.asList("0")); + + assertThat(r1.compareAndSet(Arrays.asList("1"), Arrays.asList("2"))).isFalse(); + assertThat(r1.get()).isEqualTo(Arrays.asList("0")); + + assertThat(r1.compareAndSet(Arrays.asList("0"), null)).isTrue(); + assertThat(r1.get()).isNull(); + assertThat(r1.isExists()).isFalse(); + } + + @Test + public void testGetAndSet() { + RBucket> r1 = redisson.getBucket("testGetAndSet"); + assertThat(r1.getAndSet(Arrays.asList("81"))).isNull(); + assertThat(r1.getAndSet(Arrays.asList("1"))).isEqualTo(Arrays.asList("81")); + assertThat(r1.get()).isEqualTo(Arrays.asList("1")); + + assertThat(r1.getAndSet(null)).isEqualTo(Arrays.asList("1")); + assertThat(r1.get()).isNull(); + assertThat(r1.isExists()).isFalse(); + } + @Test public void testTrySet() { - RBucket r1 = redisson.getBucket("12"); + RBucket r1 = redisson.getBucket("testTrySet"); assertThat(r1.trySet("3")).isTrue(); assertThat(r1.trySet("4")).isFalse(); assertThat(r1.get()).isEqualTo("3"); @@ -24,7 +54,7 @@ public class RedissonBucketTest extends BaseTest { @Test public void testTrySetTTL() throws InterruptedException { - RBucket r1 = redisson.getBucket("12"); + RBucket r1 = redisson.getBucket("testTrySetTTL"); assertThat(r1.trySet("3", 500, TimeUnit.MILLISECONDS)).isTrue(); assertThat(r1.trySet("4", 500, TimeUnit.MILLISECONDS)).isFalse(); assertThat(r1.get()).isEqualTo("3");