Feature - GetAndSet method with TTL support added RBucket, RBucketAsync, RBucketReactive, RBucketRx interfaces #1986

pull/2009/head
Nikita Koksharov 6 years ago
parent 90699e0cdf
commit da7ebb5b71

@ -62,7 +62,7 @@ public class RedissonBucket<V> extends RedissonExpirable implements RBucket<V> {
+ "return 1 "
+ "else "
+ "return 0 end",
Collections.<Object>singletonList(getName()), encode(expect));
Collections.singletonList(getName()), encode(expect));
}
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
@ -71,7 +71,7 @@ public class RedissonBucket<V> extends RedissonExpirable implements RBucket<V> {
+ "return 1 "
+ "else "
+ "return 0 end",
Collections.<Object>singletonList(getName()), encode(expect), encode(update));
Collections.singletonList(getName()), encode(expect), encode(update));
}
@Override
@ -86,7 +86,7 @@ public class RedissonBucket<V> extends RedissonExpirable implements RBucket<V> {
"local v = redis.call('get', KEYS[1]); "
+ "redis.call('del', KEYS[1]); "
+ "return v",
Collections.<Object>singletonList(getName()));
Collections.singletonList(getName()));
}
return commandExecutor.writeAsync(getName(), codec, RedisCommands.GETSET, getName(), encode(newValue));
@ -113,7 +113,7 @@ public class RedissonBucket<V> extends RedissonExpirable implements RBucket<V> {
"local currValue = redis.call('get', KEYS[1]); "
+ "redis.call('del', KEYS[1]); "
+ "return currValue; ",
Collections.<Object>singletonList(getName()));
Collections.singletonList(getName()));
}
@Override
@ -181,4 +181,19 @@ public class RedissonBucket<V> extends RedissonExpirable implements RBucket<V> {
return get(trySetAsync(value));
}
@Override
public RFuture<V> getAndSetAsync(V value, long timeToLive, TimeUnit timeUnit) {
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_OBJECT,
"local currValue = redis.call('get', KEYS[1]); "
+ "redis.call('psetex', KEYS[1], ARGV[2], ARGV[1]); "
+ "return currValue; ",
Collections.singletonList(getName()),
encode(value), timeUnit.toMillis(timeToLive));
}
@Override
public V getAndSet(V value, long timeToLive, TimeUnit timeUnit) {
return get(getAndSetAsync(value, timeToLive, timeUnit));
}
}

@ -87,6 +87,16 @@ public interface RBucket<V> extends RExpirable, RBucketAsync<V> {
*/
V getAndSet(V newValue);
/**
* Retrieves current element in the holder and replaces it with <code>newValue</code> with defined <code>timeToLive</code> interval.
*
* @param value - value to set
* @param timeToLive - time to live interval
* @param timeUnit - unit of time to live interval
* @return previous value
*/
V getAndSet(V value, long timeToLive, TimeUnit timeUnit);
/**
* Stores element into the holder.
*

@ -87,6 +87,16 @@ public interface RBucketAsync<V> extends RExpirableAsync {
*/
RFuture<V> getAndSetAsync(V newValue);
/**
* Retrieves current element in the holder and replaces it with <code>newValue</code> with defined <code>timeToLive</code> interval.
*
* @param value - value to set
* @param timeToLive - time to live interval
* @param timeUnit - unit of time to live interval
* @return previous value
*/
RFuture<V> getAndSetAsync(V value, long timeToLive, TimeUnit timeUnit);
/**
* Stores element into the holder.
*

@ -76,6 +76,16 @@ public interface RBucketReactive<V> extends RExpirableReactive {
*/
Mono<V> getAndSet(V newValue);
/**
* Retrieves current element in the holder and replaces it with <code>newValue</code> with defined <code>timeToLive</code> interval.
*
* @param value - value to set
* @param timeToLive - time to live interval
* @param timeUnit - unit of time to live interval
* @return previous value
*/
Mono<V> getAndSet(V value, long timeToLive, TimeUnit timeUnit);
/**
* Retrieves element stored in the holder.
*

@ -77,6 +77,16 @@ public interface RBucketRx<V> extends RExpirableRx {
* @return previous value
*/
Maybe<V> getAndSet(V newValue);
/**
* Retrieves current element in the holder and replaces it with <code>newValue</code> with defined <code>timeToLive</code> interval.
*
* @param value - value to set
* @param timeToLive - time to live interval
* @param timeUnit - unit of time to live interval
* @return previous value
*/
Maybe<V> getAndSet(V value, long timeToLive, TimeUnit timeUnit);
/**
* Retrieves element stored in the holder.

@ -288,8 +288,17 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
}
@Override
public RFuture<V> getAndSetAsync(V value, long timeToLive, TimeUnit timeUnit) {
return getAndSet(value, new BucketGetAndSetOperation<V>(getName(), getLockName(), getCodec(), value, timeToLive, timeUnit, transactionId));
}
@Override
public RFuture<V> getAndSetAsync(V value) {
return getAndSet(value, new BucketGetAndSetOperation<V>(getName(), getLockName(), getCodec(), value, transactionId));
}
@SuppressWarnings("unchecked")
public RFuture<V> getAndSetAsync(V newValue) {
private RFuture<V> getAndSet(V newValue, TransactionalOperation operation) {
checkState();
RPromise<V> result = new RedissonPromise<V>();
executeLocked(result, new Runnable() {
@ -302,7 +311,7 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
} else {
prevValue = state;
}
operations.add(new BucketGetAndSetOperation<V>(getName(), getLockName(), getCodec(), newValue, transactionId));
operations.add(operation);
if (newValue == null) {
state = NULL;
} else {
@ -323,7 +332,7 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
} else {
state = newValue;
}
operations.add(new BucketGetAndSetOperation<V>(getName(), getLockName(), getCodec(), newValue, transactionId));
operations.add(operation);
result.trySuccess(res);
});
}

@ -15,8 +15,11 @@
*/
package org.redisson.transaction.operation.bucket;
import java.util.concurrent.TimeUnit;
import org.redisson.RedissonBucket;
import org.redisson.RedissonLock;
import org.redisson.api.RBucket;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.RedissonTransactionalLock;
@ -33,6 +36,14 @@ public class BucketGetAndSetOperation<V> extends TransactionalOperation {
private Object value;
private String lockName;
private String transactionId;
private long timeToLive;
private TimeUnit timeUnit;
public BucketGetAndSetOperation(String name, String lockName, Codec codec, Object value, long timeToLive, TimeUnit timeUnit, String transactionId) {
this(name, lockName, codec, value, transactionId);
this.timeToLive = timeToLive;
this.timeUnit = timeUnit;
}
public BucketGetAndSetOperation(String name, String lockName, Codec codec, Object value, String transactionId) {
super(name, codec);
@ -43,8 +54,12 @@ public class BucketGetAndSetOperation<V> extends TransactionalOperation {
@Override
public void commit(CommandAsyncExecutor commandExecutor) {
RedissonBucket<V> bucket = new RedissonBucket<V>(codec, commandExecutor, name);
bucket.getAndSetAsync((V) value);
RBucket<V> bucket = new RedissonBucket<V>(codec, commandExecutor, name);
if (timeToLive != 0) {
bucket.getAndSetAsync((V) value, timeToLive, timeUnit);
} else {
bucket.getAndSetAsync((V) value);
}
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync();
}

@ -163,6 +163,18 @@ public class RedissonBucketTest extends BaseTest {
assertThat(r1.isExists()).isFalse();
}
@Test
public void testGetAndSetTTL() throws InterruptedException {
RBucket<String> r1 = redisson.getBucket("getAndSetTTL");
r1.set("value1");
assertThat(r1.getAndSet("value2", 500, TimeUnit.MILLISECONDS)).isEqualTo("value1");
assertThat(r1.get()).isEqualTo("value2");
Thread.sleep(1000);
assertThat(r1.get()).isNull();
}
@Test
public void testGetAndSet() {
RBucket<List<String>> r1 = redisson.getBucket("testGetAndSet");

Loading…
Cancel
Save