refactoring

pull/5186/head
Nikita Koksharov 2 years ago
parent 7b05e728d7
commit 81b06b19fe

@ -192,6 +192,20 @@ public class RedissonBucket<V> extends RedissonExpirable implements RBucket<V> {
return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.PSETEX, getRawName(), timeUnit.toMillis(timeToLive), encode(value));
}
@Override
public void set(V value, Duration duration) {
get(setAsync(value, duration));
}
@Override
public RFuture<Void> setAsync(V value, Duration duration) {
if (value == null) {
return commandExecutor.writeAsync(getRawName(), RedisCommands.DEL_VOID, getRawName());
}
return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.PSETEX, getRawName(), duration.toMillis(), encode(value));
}
@Override
public RFuture<Boolean> trySetAsync(V value) {
if (value == null) {
@ -295,6 +309,35 @@ public class RedissonBucket<V> extends RedissonExpirable implements RBucket<V> {
return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.SET_BOOLEAN, getRawName(), encode(value), "PX", timeUnit.toMillis(timeToLive), "XX");
}
@Override
public boolean setIfExists(V value, Duration duration) {
return get(setIfExistsAsync(value, duration));
}
@Override
public RFuture<Boolean> setIfExistsAsync(V value, Duration duration) {
if (value == null) {
throw new IllegalArgumentException("Value can't be null");
}
return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.SET_BOOLEAN, getRawName(), encode(value), "PX", duration.toMillis(), "XX");
}
@Override
public V getAndSet(V value, Duration duration) {
return get(getAndSetAsync(value, duration));
}
@Override
public RFuture<V> getAndSetAsync(V value, Duration duration) {
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_OBJECT,
"local currValue = redis.call('get', KEYS[1]); "
+ "redis.call('psetex', KEYS[1], ARGV[2], ARGV[1]); "
+ "return currValue; ",
Collections.singletonList(getRawName()),
encode(value), duration.toMillis());
}
@Override
public RFuture<V> getAndSetAsync(V value, long timeToLive, TimeUnit timeUnit) {
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_OBJECT,

@ -236,6 +236,23 @@ public class RedissonJsonBucket<V> extends RedissonExpirable implements RJsonBuc
Collections.singletonList(getRawName()), encode(value), timeUnit.toMillis(timeToLive));
}
@Override
public boolean setIfExists(V value, Duration duration) {
return get(setIfExistsAsync(value, duration));
}
@Override
public RFuture<Boolean> setIfExistsAsync(V value, Duration duration) {
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN,
"local currValue = redis.call('json.set', KEYS[1], '$', ARGV[1], 'XX'); " +
"if currValue ~= false then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 1;" +
"end;" +
"return 0; ",
Collections.singletonList(getRawName()), encode(value), duration.toMillis());
}
@Override
public boolean compareAndSet(V expect, V update) {
return get(compareAndSetAsync(expect, update));
@ -367,6 +384,29 @@ public class RedissonJsonBucket<V> extends RedissonExpirable implements RJsonBuc
Collections.singletonList(getRawName()), encode(value), timeUnit.toMillis(timeToLive));
}
@Override
public V getAndSet(V value, Duration duration) {
return get(getAndSetAsync(value, duration));
}
@Override
public RFuture<V> getAndSetAsync(V value, Duration duration) {
if (value == null) {
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_OBJECT,
"local v = redis.call('json.get', KEYS[1]); " +
"redis.call('json.del', KEYS[1]); " +
"return v",
Collections.singletonList(getRawName()));
}
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_OBJECT,
"local currValue = redis.call('json.get', KEYS[1]); " +
"redis.call('json.set', KEYS[1], '$', ARGV[1]); " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return currValue; ",
Collections.singletonList(getRawName()), encode(value), duration.toMillis());
}
@Override
public V getAndExpire(Duration duration) {
return get(getAndExpireAsync(duration));
@ -442,6 +482,19 @@ public class RedissonJsonBucket<V> extends RedissonExpirable implements RJsonBuc
Collections.singletonList(getRawName()), encode(value), timeUnit.toMillis(timeToLive));
}
@Override
public void set(V value, Duration duration) {
get(setAsync(value, duration));
}
@Override
public RFuture<Void> setAsync(V value, Duration duration) {
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_VOID,
"redis.call('json.set', KEYS[1], '$', ARGV[1]); " +
"redis.call('pexpire', KEYS[1], ARGV[2]); ",
Collections.singletonList(getRawName()), encode(value), duration.toMillis());
}
@Override
public void setAndKeepTTL(V value) {
get(setAndKeepTTLAsync(value));

@ -100,7 +100,7 @@ public interface RBucket<V> extends RExpirable, RBucketAsync<V> {
boolean setIfExists(V value);
/**
* Sets value only if object holder already exists.
* Use {@link #setIfExists(Object, Duration)} instead
*
* @param value - value to set
* @param timeToLive - time to live interval
@ -108,8 +108,19 @@ public interface RBucket<V> extends RExpirable, RBucketAsync<V> {
* @return {@code true} if successful, or {@code false} if
* element wasn't set
*/
@Deprecated
boolean setIfExists(V value, long timeToLive, TimeUnit timeUnit);
/**
* Sets <code>value</code> with expiration <code>duration</code> only if object holder already exists.
*
* @param value value to set
* @param duration expiration duration
* @return {@code true} if successful, or {@code false} if
* element wasn't set
*/
boolean setIfExists(V value, Duration duration);
/**
* Atomically sets the value to the given updated value
* only if serialized state of the current value equals
@ -131,15 +142,26 @@ public interface RBucket<V> extends RExpirable, RBucketAsync<V> {
V getAndSet(V newValue);
/**
* Retrieves current element in the holder and replaces it with <code>newValue</code> with defined <code>timeToLive</code> interval.
* Use {@link #getAndSet(Object, Duration)} instead
*
* @param value - value to set
* @param timeToLive - time to live interval
* @param timeUnit - unit of time to live interval
* @return previous value
*/
@Deprecated
V getAndSet(V value, long timeToLive, TimeUnit timeUnit);
/**
* Retrieves current element in the holder and replaces it
* with <code>value</code> with defined expiration <code>duration</code>.
*
* @param value value to set
* @param duration expiration duration
* @return previous value
*/
V getAndSet(V value, Duration duration);
/**
* Retrieves current element in the holder and sets an expiration duration for it.
* <p>
@ -177,14 +199,23 @@ public interface RBucket<V> extends RExpirable, RBucketAsync<V> {
void set(V value);
/**
* Stores element into the holder with defined <code>timeToLive</code> interval.
* Use {@link #set(Object, Duration)} instead
*
* @param value - value to set
* @param timeToLive - time to live interval
* @param timeUnit - unit of time to live interval
*/
@Deprecated
void set(V value, long timeToLive, TimeUnit timeUnit);
/**
* Stores <code>value</code> into the holder with defined expiration <code>duration</code>.
*
* @param value value to set
* @param duration expiration duration
*/
void set(V value, Duration duration);
/**
* Set value and keep existing TTL.
* <p>

@ -100,7 +100,7 @@ public interface RBucketAsync<V> extends RExpirableAsync {
RFuture<Boolean> setIfExistsAsync(V value);
/**
* Sets value only if it's already exists.
* Use {@link #setIfExistsAsync(Object, Duration)} instead
*
* @param value - value to set
* @param timeToLive - time to live interval
@ -110,6 +110,16 @@ public interface RBucketAsync<V> extends RExpirableAsync {
*/
RFuture<Boolean> setIfExistsAsync(V value, long timeToLive, TimeUnit timeUnit);
/**
* Sets <code>value</code> with expiration <code>duration</code> only if object holder already exists.
*
* @param value value to set
* @param duration expiration duration
* @return {@code true} if successful, or {@code false} if
* element wasn't set
*/
RFuture<Boolean> setIfExistsAsync(V value, Duration duration);
/**
* Atomically sets the value to the given updated value
* only if serialized state of the current value equals
@ -131,7 +141,7 @@ public interface RBucketAsync<V> extends RExpirableAsync {
RFuture<V> getAndSetAsync(V newValue);
/**
* Retrieves current element in the holder and replaces it with <code>newValue</code> with defined <code>timeToLive</code> interval.
* Use {@link #getAndSetAsync(Object, Duration)} instead
*
* @param value - value to set
* @param timeToLive - time to live interval
@ -140,6 +150,16 @@ public interface RBucketAsync<V> extends RExpirableAsync {
*/
RFuture<V> getAndSetAsync(V value, long timeToLive, TimeUnit timeUnit);
/**
* Retrieves current element in the holder and replaces it
* with <code>value</code> with defined expiration <code>duration</code>.
*
* @param value value to set
* @param duration expiration duration
* @return previous value
*/
RFuture<V> getAndSetAsync(V value, Duration duration);
/**
* Retrieves current element in the holder and sets an expiration duration for it.
* <p>
@ -178,7 +198,7 @@ public interface RBucketAsync<V> extends RExpirableAsync {
RFuture<Void> setAsync(V value);
/**
* Stores element into the holder with defined <code>timeToLive</code> interval.
* Use {@link #setAsync(Object, Duration)} instead
*
* @param value - value to set
* @param timeToLive - time to live interval
@ -187,6 +207,14 @@ public interface RBucketAsync<V> extends RExpirableAsync {
*/
RFuture<Void> setAsync(V value, long timeToLive, TimeUnit timeUnit);
/**
* Stores <code>value</code> into the holder with defined expiration <code>duration</code>.
*
* @param value value to set
* @param duration expiration duration
*/
RFuture<Void> setAsync(V value, Duration duration);
/**
* Set value and keep existing TTL.
* <p>

@ -89,7 +89,7 @@ public interface RBucketReactive<V> extends RExpirableReactive {
Mono<Boolean> setIfExists(V value);
/**
* Sets value only if it's already exists.
* Use {@link #setIfExists(Object, Duration)} instead
*
* @param value - value to set
* @param timeToLive - time to live interval
@ -97,8 +97,19 @@ public interface RBucketReactive<V> extends RExpirableReactive {
* @return {@code true} if successful, or {@code false} if
* element wasn't set
*/
@Deprecated
Mono<Boolean> setIfExists(V value, long timeToLive, TimeUnit timeUnit);
/**
* Sets <code>value</code> with expiration <code>duration</code> only if object holder already exists.
*
* @param value value to set
* @param duration expiration duration
* @return {@code true} if successful, or {@code false} if
* element wasn't set
*/
Mono<Boolean> setIfExists(V value, Duration duration);
/**
* Atomically sets the value to the given updated value
* only if serialized state of the current value equals
@ -120,15 +131,26 @@ public interface RBucketReactive<V> extends RExpirableReactive {
Mono<V> getAndSet(V newValue);
/**
* Retrieves current element in the holder and replaces it with <code>newValue</code> with defined <code>timeToLive</code> interval.
* Use {@link #getAndSet(Object, Duration)} instead
*
* @param value - value to set
* @param timeToLive - time to live interval
* @param timeUnit - unit of time to live interval
* @return previous value
*/
@Deprecated
Mono<V> getAndSet(V value, long timeToLive, TimeUnit timeUnit);
/**
* Retrieves current element in the holder and replaces it
* with <code>value</code> with defined expiration <code>duration</code>.
*
* @param value value to set
* @param duration expiration duration
* @return previous value
*/
Mono<V> getAndSet(V value, Duration duration);
/**
* Retrieves current element in the holder and sets an expiration duration for it.
* <p>
@ -181,15 +203,24 @@ public interface RBucketReactive<V> extends RExpirableReactive {
Mono<Void> set(V value);
/**
* Stores element into the holder with defined <code>timeToLive</code> interval.
* Use {@link #set(Object, Duration)} instead
*
* @param value - value to set
* @param timeToLive - time to live interval
* @param timeUnit - unit of time to live interval
* @return void
*/
@Deprecated
Mono<Void> set(V value, long timeToLive, TimeUnit timeUnit);
/**
* Stores <code>value</code> into the holder with defined expiration <code>duration</code>.
*
* @param value value to set
* @param duration expiration duration
*/
Mono<Void> set(V value, Duration duration);
/**
* Set value and keep existing TTL.
* <p>

@ -91,7 +91,7 @@ public interface RBucketRx<V> extends RExpirableRx {
Single<Boolean> setIfExists(V value);
/**
* Sets value only if it's already exists.
* Use {@link #setIfExists(Object, Duration)} instead
*
* @param value - value to set
* @param timeToLive - time to live interval
@ -99,8 +99,19 @@ public interface RBucketRx<V> extends RExpirableRx {
* @return {@code true} if successful, or {@code false} if
* element wasn't set
*/
@Deprecated
Single<Boolean> setIfExists(V value, long timeToLive, TimeUnit timeUnit);
/**
* Sets <code>value</code> with expiration <code>duration</code> only if object holder already exists.
*
* @param value value to set
* @param duration expiration duration
* @return {@code true} if successful, or {@code false} if
* element wasn't set
*/
Single<Boolean> setIfExists(V value, Duration duration);
/**
* Atomically sets the value to the given updated value
* only if serialized state of the current value equals
@ -122,15 +133,26 @@ public interface RBucketRx<V> extends RExpirableRx {
Maybe<V> getAndSet(V newValue);
/**
* Retrieves current element in the holder and replaces it with <code>newValue</code> with defined <code>timeToLive</code> interval.
* Use {@link #getAndSet(Object, Duration)} instead
*
* @param value - value to set
* @param timeToLive - time to live interval
* @param timeUnit - unit of time to live interval
* @return previous value
*/
@Deprecated
Maybe<V> getAndSet(V value, long timeToLive, TimeUnit timeUnit);
/**
* Retrieves current element in the holder and replaces it
* with <code>value</code> with defined expiration <code>duration</code>.
*
* @param value value to set
* @param duration expiration duration
* @return previous value
*/
Maybe<V> getAndSet(V value, Duration duration);
/**
* Retrieves current element in the holder and sets an expiration duration for it.
* <p>
@ -183,15 +205,24 @@ public interface RBucketRx<V> extends RExpirableRx {
Completable set(V value);
/**
* Stores element into the holder with defined <code>timeToLive</code> interval.
* Use {@link #set(Object, Duration)} instead
*
* @param value - value to set
* @param timeToLive - time to live interval
* @param timeUnit - unit of time to live interval
* @return void
*/
@Deprecated
Completable set(V value, long timeToLive, TimeUnit timeUnit);
/**
* Stores <code>value</code> into the holder with defined expiration <code>duration</code>.
*
* @param value value to set
* @param duration expiration duration
*/
Completable set(V value, Duration duration);
/**
* Set value and keep existing TTL.
* <p>

@ -30,7 +30,7 @@ public class RedissonBucketTest extends BaseTest {
Assumptions.assumeTrue(RedisRunner.getDefaultRedisServerInstance().getRedisVersion().compareTo("6.2.0") > 0);
RBucket<Integer> al = redisson.getBucket("test");
al.set(1, 1, TimeUnit.SECONDS);
al.set(1, Duration.ofSeconds(1));
assertThat(al.getAndClearExpire()).isEqualTo(1);
assertThat(al.remainTimeToLive()).isEqualTo(-1);
}
@ -72,7 +72,7 @@ public class RedissonBucketTest extends BaseTest {
Assumptions.assumeTrue(RedisRunner.getDefaultRedisServerInstance().getRedisVersion().compareTo("6.0.0") > 0);
RBucket<Integer> al = redisson.getBucket("test");
al.set(1234, 10, TimeUnit.SECONDS);
al.set(1234, Duration.ofSeconds(10));
al.setAndKeepTTL(222);
assertThat(al.remainTimeToLive()).isGreaterThan(9900);
assertThat(al.get()).isEqualTo(222);
@ -167,7 +167,7 @@ public class RedissonBucketTest extends BaseTest {
RedissonClient redisson = Redisson.create(config);
RBucket<Integer> al = redisson.getBucket("test");
al.set(1, 3, TimeUnit.SECONDS);
al.set(1, Duration.ofSeconds(3));
CountDownLatch latch = new CountDownLatch(1);
al.addListener(new ExpiredObjectListener() {
@Override
@ -264,7 +264,7 @@ public class RedissonBucketTest extends BaseTest {
public void testGetAndSetTTL() throws InterruptedException {
RBucket<String> r1 = redisson.getBucket("getAndSetTTL");
r1.set("value1");
assertThat(r1.getAndSet("value2", 500, TimeUnit.MILLISECONDS)).isEqualTo("value1");
assertThat(r1.getAndSet("value2", Duration.ofMillis(500))).isEqualTo("value1");
assertThat(r1.get()).isEqualTo("value2");
Thread.sleep(1000);
@ -295,7 +295,7 @@ public class RedissonBucketTest extends BaseTest {
RBucket<String> r2 = redisson.getBucket("test2");
r2.set("1");
assertThat(r2.setIfExists("2", 1, TimeUnit.SECONDS)).isTrue();
assertThat(r2.setIfExists("2", Duration.ofSeconds(1))).isTrue();
assertThat(r2.get()).isEqualTo("2");
Thread.sleep(1000);
assertThat(r2.isExists()).isFalse();
@ -324,7 +324,7 @@ public class RedissonBucketTest extends BaseTest {
@Test
public void testExpire() throws InterruptedException {
RBucket<String> bucket = redisson.getBucket("test1");
bucket.set("someValue", 1, TimeUnit.SECONDS);
bucket.set("someValue", Duration.ofSeconds(1));
Thread.sleep(1100);
@ -425,7 +425,7 @@ public class RedissonBucketTest extends BaseTest {
Assertions.assertEquals(value, bucket.get());
bucket.set(null);
bucket.set(null, 1, TimeUnit.DAYS);
bucket.set(null, Duration.ofDays(1));
assertThat(bucket.isExists()).isFalse();
}

Loading…
Cancel
Save