diff --git a/redisson/src/main/java/org/redisson/RedissonBucket.java b/redisson/src/main/java/org/redisson/RedissonBucket.java index f730fe909..687269cf0 100644 --- a/redisson/src/main/java/org/redisson/RedissonBucket.java +++ b/redisson/src/main/java/org/redisson/RedissonBucket.java @@ -62,7 +62,7 @@ public class RedissonBucket extends RedissonExpirable implements RBucket { + "return 1 " + "else " + "return 0 end", - Collections.singletonList(getName()), encode(expect)); + Collections.singletonList(getName()), encode(expect)); } return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN, @@ -71,7 +71,7 @@ public class RedissonBucket extends RedissonExpirable implements RBucket { + "return 1 " + "else " + "return 0 end", - Collections.singletonList(getName()), encode(expect), encode(update)); + Collections.singletonList(getName()), encode(expect), encode(update)); } @Override @@ -86,7 +86,7 @@ public class RedissonBucket extends RedissonExpirable implements RBucket { "local v = redis.call('get', KEYS[1]); " + "redis.call('del', KEYS[1]); " + "return v", - Collections.singletonList(getName())); + Collections.singletonList(getName())); } return commandExecutor.writeAsync(getName(), codec, RedisCommands.GETSET, getName(), encode(newValue)); @@ -113,7 +113,7 @@ public class RedissonBucket extends RedissonExpirable implements RBucket { "local currValue = redis.call('get', KEYS[1]); " + "redis.call('del', KEYS[1]); " + "return currValue; ", - Collections.singletonList(getName())); + Collections.singletonList(getName())); } @Override @@ -181,4 +181,19 @@ public class RedissonBucket extends RedissonExpirable implements RBucket { return get(trySetAsync(value)); } + @Override + public RFuture getAndSetAsync(V value, long timeToLive, TimeUnit timeUnit) { + return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_OBJECT, + "local currValue = redis.call('get', KEYS[1]); " + + "redis.call('psetex', KEYS[1], ARGV[2], ARGV[1]); " + + "return currValue; ", + Collections.singletonList(getName()), + encode(value), timeUnit.toMillis(timeToLive)); + } + + @Override + public V getAndSet(V value, long timeToLive, TimeUnit timeUnit) { + return get(getAndSetAsync(value, timeToLive, timeUnit)); + } + } diff --git a/redisson/src/main/java/org/redisson/api/RBucket.java b/redisson/src/main/java/org/redisson/api/RBucket.java index 21b27f0b3..5bc0a5037 100644 --- a/redisson/src/main/java/org/redisson/api/RBucket.java +++ b/redisson/src/main/java/org/redisson/api/RBucket.java @@ -87,6 +87,16 @@ public interface RBucket extends RExpirable, RBucketAsync { */ V getAndSet(V newValue); + /** + * Retrieves current element in the holder and replaces it with newValue with defined timeToLive interval. + * + * @param value - value to set + * @param timeToLive - time to live interval + * @param timeUnit - unit of time to live interval + * @return previous value + */ + V getAndSet(V value, long timeToLive, TimeUnit timeUnit); + /** * Stores element into the holder. * diff --git a/redisson/src/main/java/org/redisson/api/RBucketAsync.java b/redisson/src/main/java/org/redisson/api/RBucketAsync.java index 6360573d1..6eb28fdf2 100644 --- a/redisson/src/main/java/org/redisson/api/RBucketAsync.java +++ b/redisson/src/main/java/org/redisson/api/RBucketAsync.java @@ -87,6 +87,16 @@ public interface RBucketAsync extends RExpirableAsync { */ RFuture getAndSetAsync(V newValue); + /** + * Retrieves current element in the holder and replaces it with newValue with defined timeToLive interval. + * + * @param value - value to set + * @param timeToLive - time to live interval + * @param timeUnit - unit of time to live interval + * @return previous value + */ + RFuture getAndSetAsync(V value, long timeToLive, TimeUnit timeUnit); + /** * Stores element into the holder. * diff --git a/redisson/src/main/java/org/redisson/api/RBucketReactive.java b/redisson/src/main/java/org/redisson/api/RBucketReactive.java index d6f1f7bdc..d3a66a9b3 100644 --- a/redisson/src/main/java/org/redisson/api/RBucketReactive.java +++ b/redisson/src/main/java/org/redisson/api/RBucketReactive.java @@ -76,6 +76,16 @@ public interface RBucketReactive extends RExpirableReactive { */ Mono getAndSet(V newValue); + /** + * Retrieves current element in the holder and replaces it with newValue with defined timeToLive interval. + * + * @param value - value to set + * @param timeToLive - time to live interval + * @param timeUnit - unit of time to live interval + * @return previous value + */ + Mono getAndSet(V value, long timeToLive, TimeUnit timeUnit); + /** * Retrieves element stored in the holder. * diff --git a/redisson/src/main/java/org/redisson/api/RBucketRx.java b/redisson/src/main/java/org/redisson/api/RBucketRx.java index df5ed9a8f..6db984d8e 100644 --- a/redisson/src/main/java/org/redisson/api/RBucketRx.java +++ b/redisson/src/main/java/org/redisson/api/RBucketRx.java @@ -77,6 +77,16 @@ public interface RBucketRx extends RExpirableRx { * @return previous value */ Maybe getAndSet(V newValue); + + /** + * Retrieves current element in the holder and replaces it with newValue with defined timeToLive interval. + * + * @param value - value to set + * @param timeToLive - time to live interval + * @param timeUnit - unit of time to live interval + * @return previous value + */ + Maybe getAndSet(V value, long timeToLive, TimeUnit timeUnit); /** * Retrieves element stored in the holder. diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBucket.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBucket.java index bfed1c0f7..3f674eccd 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBucket.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBucket.java @@ -288,8 +288,17 @@ public class RedissonTransactionalBucket extends RedissonBucket { } @Override + public RFuture getAndSetAsync(V value, long timeToLive, TimeUnit timeUnit) { + return getAndSet(value, new BucketGetAndSetOperation(getName(), getLockName(), getCodec(), value, timeToLive, timeUnit, transactionId)); + } + + @Override + public RFuture getAndSetAsync(V value) { + return getAndSet(value, new BucketGetAndSetOperation(getName(), getLockName(), getCodec(), value, transactionId)); + } + @SuppressWarnings("unchecked") - public RFuture getAndSetAsync(V newValue) { + private RFuture getAndSet(V newValue, TransactionalOperation operation) { checkState(); RPromise result = new RedissonPromise(); executeLocked(result, new Runnable() { @@ -302,7 +311,7 @@ public class RedissonTransactionalBucket extends RedissonBucket { } else { prevValue = state; } - operations.add(new BucketGetAndSetOperation(getName(), getLockName(), getCodec(), newValue, transactionId)); + operations.add(operation); if (newValue == null) { state = NULL; } else { @@ -323,7 +332,7 @@ public class RedissonTransactionalBucket extends RedissonBucket { } else { state = newValue; } - operations.add(new BucketGetAndSetOperation(getName(), getLockName(), getCodec(), newValue, transactionId)); + operations.add(operation); result.trySuccess(res); }); } diff --git a/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketGetAndSetOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketGetAndSetOperation.java index 9e921ea7e..32f57f694 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketGetAndSetOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketGetAndSetOperation.java @@ -15,8 +15,11 @@ */ package org.redisson.transaction.operation.bucket; +import java.util.concurrent.TimeUnit; + import org.redisson.RedissonBucket; import org.redisson.RedissonLock; +import org.redisson.api.RBucket; import org.redisson.client.codec.Codec; import org.redisson.command.CommandAsyncExecutor; import org.redisson.transaction.RedissonTransactionalLock; @@ -33,6 +36,14 @@ public class BucketGetAndSetOperation extends TransactionalOperation { private Object value; private String lockName; private String transactionId; + private long timeToLive; + private TimeUnit timeUnit; + + public BucketGetAndSetOperation(String name, String lockName, Codec codec, Object value, long timeToLive, TimeUnit timeUnit, String transactionId) { + this(name, lockName, codec, value, transactionId); + this.timeToLive = timeToLive; + this.timeUnit = timeUnit; + } public BucketGetAndSetOperation(String name, String lockName, Codec codec, Object value, String transactionId) { super(name, codec); @@ -43,8 +54,12 @@ public class BucketGetAndSetOperation extends TransactionalOperation { @Override public void commit(CommandAsyncExecutor commandExecutor) { - RedissonBucket bucket = new RedissonBucket(codec, commandExecutor, name); - bucket.getAndSetAsync((V) value); + RBucket bucket = new RedissonBucket(codec, commandExecutor, name); + if (timeToLive != 0) { + bucket.getAndSetAsync((V) value, timeToLive, timeUnit); + } else { + bucket.getAndSetAsync((V) value); + } RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); lock.unlockAsync(); } diff --git a/redisson/src/test/java/org/redisson/RedissonBucketTest.java b/redisson/src/test/java/org/redisson/RedissonBucketTest.java index e2b616c06..51c95a71e 100755 --- a/redisson/src/test/java/org/redisson/RedissonBucketTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBucketTest.java @@ -163,6 +163,18 @@ public class RedissonBucketTest extends BaseTest { assertThat(r1.isExists()).isFalse(); } + @Test + public void testGetAndSetTTL() throws InterruptedException { + RBucket r1 = redisson.getBucket("getAndSetTTL"); + r1.set("value1"); + assertThat(r1.getAndSet("value2", 500, TimeUnit.MILLISECONDS)).isEqualTo("value1"); + assertThat(r1.get()).isEqualTo("value2"); + + Thread.sleep(1000); + + assertThat(r1.get()).isNull(); + } + @Test public void testGetAndSet() { RBucket> r1 = redisson.getBucket("testGetAndSet");