Feature - RedissonTransactionalSet and RedissonTransactionalSetCache support expire(), expireAt() and clearExpire() methods. #4249

pull/4385/head
Nikita Koksharov 3 years ago
parent d729a4a7aa
commit d5c4e1d031

@ -15,18 +15,16 @@
*/
package org.redisson.transaction;
import java.time.Instant;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.redisson.RedissonMultiLock;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.misc.CompletableFutureWrapper;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
/**
*
* @author Nikita Koksharov
@ -34,26 +32,6 @@ import org.redisson.misc.CompletableFutureWrapper;
*/
public class BaseTransactionalObject {
public RFuture<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit) {
throw new UnsupportedOperationException("expire method is not supported in transaction");
}
public RFuture<Boolean> expireAtAsync(Date timestamp) {
throw new UnsupportedOperationException("expire method is not supported in transaction");
}
public RFuture<Boolean> expireAtAsync(Instant timestamp) {
throw new UnsupportedOperationException("expire method is not supported in transaction");
}
public RFuture<Boolean> expireAtAsync(long timestamp) {
throw new UnsupportedOperationException("expire method is not supported in transaction");
}
public RFuture<Boolean> clearExpireAsync() {
throw new UnsupportedOperationException("clearExpire method is not supported in transaction");
}
public RFuture<Boolean> moveAsync(int database) {
throw new UnsupportedOperationException("move method is not supported in transaction");
}

@ -25,16 +25,14 @@ import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.Hash;
import org.redisson.misc.HashValue;
import org.redisson.transaction.operation.DeleteOperation;
import org.redisson.transaction.operation.TouchOperation;
import org.redisson.transaction.operation.TransactionalOperation;
import org.redisson.transaction.operation.UnlinkOperation;
import org.redisson.transaction.operation.*;
import org.redisson.transaction.operation.set.MoveOperation;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
/**
@ -57,6 +55,8 @@ public abstract class BaseTransactionalSet<V> extends BaseTransactionalObject {
Boolean deleted;
String transactionId;
boolean hasExpiration;
public BaseTransactionalSet(CommandAsyncExecutor commandExecutor, long timeout, List<TransactionalOperation> operations,
RCollectionAsync<V> set, String transactionId) {
this.commandExecutor = commandExecutor;
@ -105,8 +105,7 @@ public abstract class BaseTransactionalSet<V> extends BaseTransactionalObject {
return set.isExistsAsync().thenApply(exists -> {
operations.add(new TouchOperation(name, null, getLockName(), currentThreadId, transactionId));
if (!exists) {
boolean notExists = state.values().stream().noneMatch(v -> v != NULL);
return !notExists;
return isExists();
}
return exists;
});
@ -455,4 +454,60 @@ public abstract class BaseTransactionalSet<V> extends BaseTransactionalObject {
return executeLocked(timeout, runnable, locks);
}
public RFuture<Boolean> clearExpireAsync() {
long currentThreadId = Thread.currentThread().getId();
return executeLocked(timeout, () -> {
if (hasExpiration) {
operations.add(new ClearExpireOperation(name, null, getLockName(), currentThreadId, transactionId));
hasExpiration = false;
return CompletableFuture.completedFuture(true);
}
return set.remainTimeToLiveAsync().thenApply(res -> {
operations.add(new ClearExpireOperation(name, null, getLockName(), currentThreadId, transactionId));
hasExpiration = false;
return res > 0;
});
}, getWriteLock());
}
private boolean isExists() {
boolean notExists = state.values().stream().noneMatch(v -> v != NULL);
return !notExists;
}
public RFuture<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit, String param, String... keys) {
long currentThreadId = Thread.currentThread().getId();
return executeLocked(timeout, () -> {
if (isExists()) {
operations.add(new ExpireOperation(name, null, getLockName(), currentThreadId, transactionId, timeToLive, timeUnit, param, keys));
hasExpiration = true;
return CompletableFuture.completedFuture(true);
}
return isExistsAsync().thenApply(res -> {
operations.add(new ExpireOperation(name, null, getLockName(), currentThreadId, transactionId, timeToLive, timeUnit, param, keys));
hasExpiration = res;
return res;
});
}, getWriteLock());
}
public RFuture<Boolean> expireAtAsync(long timestamp, String param, String... keys) {
long currentThreadId = Thread.currentThread().getId();
return executeLocked(timeout, () -> {
if (isExists()) {
operations.add(new ExpireAtOperation(name, null, getLockName(), currentThreadId, transactionId, timestamp, param, keys));
hasExpiration = true;
return CompletableFuture.completedFuture(true);
}
return isExistsAsync().thenApply(res -> {
operations.add(new ExpireAtOperation(name, null, getLockName(), currentThreadId, transactionId, timestamp, param, keys));
hasExpiration = res;
return res;
});
}, getWriteLock());
}
}

@ -59,17 +59,17 @@ public class RedissonTransactionalSet<V> extends RedissonSet<V> {
@Override
public RFuture<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit, String param, String... keys) {
throw new UnsupportedOperationException("expire method is not supported in transaction");
return transactionalSet.expireAsync(timeToLive, timeUnit, param, keys);
}
@Override
protected RFuture<Boolean> expireAtAsync(long timestamp, String param, String... keys) {
throw new UnsupportedOperationException("expire method is not supported in transaction");
return transactionalSet.expireAtAsync(timestamp, param, keys);
}
@Override
public RFuture<Boolean> clearExpireAsync() {
throw new UnsupportedOperationException("clearExpire method is not supported in transaction");
return transactionalSet.clearExpireAsync();
}
@Override

@ -57,20 +57,19 @@ public class RedissonTransactionalSetCache<V> extends RedissonSetCache<V> {
this.transactionalSet = new TransactionalSetCache<V>(commandExecutor, timeout, operations, innerSet, transactionId);
}
@Override
public RFuture<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit, String param, String... keys) {
throw new UnsupportedOperationException("expire method is not supported in transaction");
return transactionalSet.expireAsync(timeToLive, timeUnit, param, keys);
}
@Override
protected RFuture<Boolean> expireAtAsync(long timestamp, String param, String... keys) {
throw new UnsupportedOperationException("expire method is not supported in transaction");
return transactionalSet.expireAtAsync(timestamp, param, keys);
}
@Override
public RFuture<Boolean> clearExpireAsync() {
throw new UnsupportedOperationException("clearExpire method is not supported in transaction");
return transactionalSet.clearExpireAsync();
}
@Override

@ -20,6 +20,7 @@ import org.redisson.RedissonLock;
import org.redisson.api.RKeys;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.RedissonTransactionalLock;
import org.redisson.transaction.RedissonTransactionalWriteLock;
/**
*
@ -28,17 +29,21 @@ import org.redisson.transaction.RedissonTransactionalLock;
*/
public class ClearExpireOperation extends TransactionalOperation {
private String writeLockName;
private String lockName;
private String transactionId;
public ClearExpireOperation(String name) {
this(name, null, 0, null);
public ClearExpireOperation(String name, String lockName, long threadId, String transactionId) {
super(name, null, threadId);
this.lockName = lockName;
this.transactionId = transactionId;
}
public ClearExpireOperation(String name, String lockName, long threadId, String transactionId) {
public ClearExpireOperation(String name, String lockName, String writeLockName, long threadId, String transactionId) {
super(name, null, threadId);
this.lockName = lockName;
this.transactionId = transactionId;
this.writeLockName = writeLockName;
}
@Override
@ -49,6 +54,10 @@ public class ClearExpireOperation extends TransactionalOperation {
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync(getThreadId());
}
if (writeLockName != null) {
RedissonLock lock = new RedissonTransactionalWriteLock(commandExecutor, writeLockName, transactionId);
lock.unlockAsync(getThreadId());
}
}
@Override
@ -57,6 +66,10 @@ public class ClearExpireOperation extends TransactionalOperation {
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync(getThreadId());
}
if (writeLockName != null) {
RedissonLock lock = new RedissonTransactionalWriteLock(commandExecutor, writeLockName, transactionId);
lock.unlockAsync(getThreadId());
}
}
public String getLockName() {

@ -20,6 +20,7 @@ import org.redisson.RedissonLock;
import org.redisson.api.RFuture;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.RedissonTransactionalLock;
import org.redisson.transaction.RedissonTransactionalWriteLock;
/**
*
@ -42,6 +43,7 @@ public class ExpireAtOperation extends TransactionalOperation {
}
private String writeLockName;
private String lockName;
private String transactionId;
private long timestamp;
@ -61,6 +63,16 @@ public class ExpireAtOperation extends TransactionalOperation {
this.keys = keys;
}
public ExpireAtOperation(String name, String lockName, String writeLockName, long threadId, String transactionId, long timestamp, String param, String... keys) {
super(name, null, threadId);
this.lockName = lockName;
this.transactionId = transactionId;
this.timestamp = timestamp;
this.param = param;
this.keys = keys;
this.writeLockName = writeLockName;
}
@Override
public void commit(CommandAsyncExecutor commandExecutor) {
RedissonBucketExtended bucket = new RedissonBucketExtended(commandExecutor, name);
@ -69,6 +81,10 @@ public class ExpireAtOperation extends TransactionalOperation {
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync(getThreadId());
}
if (writeLockName != null) {
RedissonLock lock = new RedissonTransactionalWriteLock(commandExecutor, writeLockName, transactionId);
lock.unlockAsync(getThreadId());
}
}
@Override
@ -77,6 +93,10 @@ public class ExpireAtOperation extends TransactionalOperation {
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync(getThreadId());
}
if (writeLockName != null) {
RedissonLock lock = new RedissonTransactionalWriteLock(commandExecutor, writeLockName, transactionId);
lock.unlockAsync(getThreadId());
}
}
public String getLockName() {

@ -20,6 +20,7 @@ import org.redisson.RedissonLock;
import org.redisson.api.RFuture;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.RedissonTransactionalLock;
import org.redisson.transaction.RedissonTransactionalWriteLock;
import java.util.concurrent.TimeUnit;
@ -42,7 +43,7 @@ public class ExpireOperation extends TransactionalOperation {
}
}
private String writeLockName;
private String lockName;
private String transactionId;
private long timeToLive;
@ -64,6 +65,17 @@ public class ExpireOperation extends TransactionalOperation {
this.keys = keys;
}
public ExpireOperation(String name, String lockName, String writeLockName, long threadId, String transactionId, long timeToLive, TimeUnit timeUnit, String param, String... keys) {
super(name, null, threadId);
this.lockName = lockName;
this.transactionId = transactionId;
this.timeToLive = timeToLive;
this.timeUnit = timeUnit;
this.param = param;
this.keys = keys;
this.writeLockName = writeLockName;
}
@Override
public void commit(CommandAsyncExecutor commandExecutor) {
RedissonBucketExtended bucket = new RedissonBucketExtended(commandExecutor, name);
@ -72,6 +84,10 @@ public class ExpireOperation extends TransactionalOperation {
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync(getThreadId());
}
if (writeLockName != null) {
RedissonLock lock = new RedissonTransactionalWriteLock(commandExecutor, writeLockName, transactionId);
lock.unlockAsync(getThreadId());
}
}
@Override
@ -80,6 +96,10 @@ public class ExpireOperation extends TransactionalOperation {
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync(getThreadId());
}
if (writeLockName != null) {
RedissonLock lock = new RedissonTransactionalWriteLock(commandExecutor, writeLockName, transactionId);
lock.unlockAsync(getThreadId());
}
}
public String getLockName() {

@ -1,16 +1,17 @@
package org.redisson.transaction;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.HashSet;
import java.util.Set;
import org.junit.jupiter.api.Test;
import org.redisson.BaseTest;
import org.redisson.api.RSet;
import org.redisson.api.RTransaction;
import org.redisson.api.TransactionOptions;
import java.time.Duration;
import java.util.HashSet;
import java.util.Set;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonTransactionalSetTest extends BaseTest {
@Test
@ -111,5 +112,29 @@ public class RedissonTransactionalSetTest extends BaseTest {
assertThat(s.contains("3")).isFalse();
}
@Test
public void testExpire() throws InterruptedException {
RSet<String> s = redisson.getSet("test");
s.add("123");
RTransaction transaction = redisson.createTransaction(TransactionOptions.defaults());
RSet<String> set = transaction.getSet("test");
assertThat(set.clearExpire()).isFalse();
assertThat(set.expire(Duration.ofSeconds(2))).isTrue();
assertThat(set.clearExpire()).isTrue();
transaction.commit();
Thread.sleep(2200);
assertThat(s).containsOnly("123");
RTransaction transaction2 = redisson.createTransaction(TransactionOptions.defaults());
RSet<String> set2 = transaction2.getSet("test");
assertThat(set2.expire(Duration.ofSeconds(1))).isTrue();
transaction2.commit();
Thread.sleep(1100);
assertThat(s.isExists()).isFalse();
}
}

Loading…
Cancel
Save