From 46cf07ba270c70d669b7b2ed8464c6f5fad7f298 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 30 Jun 2022 11:25:54 +0300 Subject: [PATCH] Feature - RedissonTransactionalMap and RedissonTransactionalMapCache support expire(), expireAt() and clearExpire() methods. #4249 --- .../transaction/BaseTransactionalMap.java | 80 +++++++++++++++---- .../transaction/BaseTransactionalObject.java | 23 ++++++ .../transaction/BaseTransactionalSet.java | 41 +++------- .../transaction/RedissonTransactionalMap.java | 6 +- .../RedissonTransactionalMapCache.java | 6 +- .../transaction/TransactionalSet.java | 4 +- 6 files changed, 107 insertions(+), 53 deletions(-) diff --git a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java index 0d29bbcd6..51c4e8fa7 100644 --- a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java +++ b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java @@ -29,10 +29,7 @@ import org.redisson.command.CommandAsyncExecutor; import org.redisson.misc.CompletableFutureWrapper; import org.redisson.misc.Hash; import org.redisson.misc.HashValue; -import org.redisson.transaction.operation.DeleteOperation; -import org.redisson.transaction.operation.TouchOperation; -import org.redisson.transaction.operation.TransactionalOperation; -import org.redisson.transaction.operation.UnlinkOperation; +import org.redisson.transaction.operation.*; import org.redisson.transaction.operation.map.*; import java.math.BigDecimal; @@ -40,6 +37,7 @@ import java.util.*; import java.util.Map.Entry; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -80,17 +78,14 @@ public class BaseTransactionalMap extends BaseTransactionalObject { final List operations; final Map state = new HashMap<>(); final RMap map; - final CommandAsyncExecutor commandExecutor; - final String transactionId; Boolean deleted; - + boolean hasExpiration; + public BaseTransactionalMap(CommandAsyncExecutor commandExecutor, long timeout, List operations, RMap map, String transactionId) { - super(); + super(transactionId, getLockName(map.getName()), commandExecutor); this.timeout = timeout; this.operations = operations; this.map = map; - this.commandExecutor = commandExecutor; - this.transactionId = transactionId; } HashValue toKeyHash(Object key) { @@ -123,14 +118,9 @@ public class BaseTransactionalMap extends BaseTransactionalObject { CompletionStage f = map.isExistsAsync().thenApply(exists -> { operations.add(new TouchOperation(map.getName())); if (!exists) { - for (MapEntry entry : state.values()) { - if (entry != MapEntry.NULL) { - exists = true; - break; - } - } + return isExists(); } - return exists; + return true; }); return new CompletableFutureWrapper<>(f); } @@ -775,4 +765,60 @@ public class BaseTransactionalMap extends BaseTransactionalObject { return new RedissonTransactionalLock(commandExecutor, lockName, transactionId); } + private boolean isExists() { + boolean notExists = state.values().stream().noneMatch(v -> v != MapEntry.NULL); + return !notExists; + } + + public RFuture expireAsync(long timeToLive, TimeUnit timeUnit, String param, String... keys) { + long currentThreadId = Thread.currentThread().getId(); + return executeLocked(timeout, () -> { + if (isExists()) { + operations.add(new ExpireOperation(map.getName(), null, lockName, currentThreadId, transactionId, timeToLive, timeUnit, param, keys)); + hasExpiration = true; + return CompletableFuture.completedFuture(true); + } + + return isExistsAsync().thenApply(res -> { + operations.add(new ExpireOperation(map.getName(), null, lockName, currentThreadId, transactionId, timeToLive, timeUnit, param, keys)); + hasExpiration = res; + return res; + }); + }, getWriteLock()); + } + + public RFuture expireAtAsync(long timestamp, String param, String... keys) { + long currentThreadId = Thread.currentThread().getId(); + return executeLocked(timeout, () -> { + if (isExists()) { + operations.add(new ExpireAtOperation(map.getName(), null, lockName, currentThreadId, transactionId, timestamp, param, keys)); + hasExpiration = true; + return CompletableFuture.completedFuture(true); + } + + return isExistsAsync().thenApply(res -> { + operations.add(new ExpireAtOperation(map.getName(), null, lockName, currentThreadId, transactionId, timestamp, param, keys)); + hasExpiration = res; + return res; + }); + }, getWriteLock()); + } + + public RFuture clearExpireAsync() { + long currentThreadId = Thread.currentThread().getId(); + return executeLocked(timeout, () -> { + if (hasExpiration) { + operations.add(new ClearExpireOperation(map.getName(), null, lockName, currentThreadId, transactionId)); + hasExpiration = false; + return CompletableFuture.completedFuture(true); + } + + return map.remainTimeToLiveAsync().thenApply(res -> { + operations.add(new ClearExpireOperation(map.getName(), null, lockName, currentThreadId, transactionId)); + hasExpiration = false; + return res > 0; + }); + }, getWriteLock()); + } + } diff --git a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalObject.java b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalObject.java index dbbed8c16..f8e58086d 100644 --- a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalObject.java +++ b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalObject.java @@ -18,6 +18,7 @@ package org.redisson.transaction; import org.redisson.RedissonMultiLock; import org.redisson.api.RFuture; import org.redisson.api.RLock; +import org.redisson.command.CommandAsyncExecutor; import org.redisson.misc.CompletableFutureWrapper; import java.util.List; @@ -32,6 +33,16 @@ import java.util.function.Supplier; */ public class BaseTransactionalObject { + final String transactionId; + final String lockName; + final CommandAsyncExecutor commandExecutor; + + public BaseTransactionalObject(String transactionId, String lockName, CommandAsyncExecutor commandExecutor) { + this.transactionId = transactionId; + this.lockName = lockName; + this.commandExecutor = commandExecutor; + } + public RFuture moveAsync(int database) { throw new UnsupportedOperationException("move method is not supported in transaction"); } @@ -40,6 +51,18 @@ public class BaseTransactionalObject { throw new UnsupportedOperationException("migrate method is not supported in transaction"); } + protected RLock getWriteLock() { + return new RedissonTransactionalWriteLock(commandExecutor, lockName, transactionId); + } + + protected RLock getReadLock() { + return new RedissonTransactionalReadLock(commandExecutor, lockName, transactionId); + } + + protected static String getLockName(String name) { + return name + ":transaction_lock"; + } + protected RFuture executeLocked(long timeout, Supplier> runnable, RLock lock) { return executeLocked(Thread.currentThread().getId(), timeout, runnable, lock); } diff --git a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalSet.java b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalSet.java index 744b17cdf..2ada5bb03 100644 --- a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalSet.java +++ b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalSet.java @@ -46,26 +46,23 @@ public abstract class BaseTransactionalSet extends BaseTransactionalObject { static final Object NULL = new Object(); private final long timeout; - final Map state = new HashMap(); + final Map state = new HashMap<>(); final List operations; final RCollectionAsync set; final RObject object; final String name; - final CommandAsyncExecutor commandExecutor; Boolean deleted; - String transactionId; boolean hasExpiration; public BaseTransactionalSet(CommandAsyncExecutor commandExecutor, long timeout, List operations, RCollectionAsync set, String transactionId) { - this.commandExecutor = commandExecutor; + super(transactionId, getLockName(((RObject) set).getName()), commandExecutor); this.timeout = timeout; this.operations = operations; this.set = set; this.object = (RObject) set; this.name = object.getName(); - this.transactionId = transactionId; } private HashValue toHash(Object value) { @@ -85,36 +82,32 @@ public abstract class BaseTransactionalSet extends BaseTransactionalObject { return set.isExistsAsync(); } - protected String getLockName() { - return name + ":transaction_lock"; - } - public RFuture unlinkAsync() { long currentThreadId = Thread.currentThread().getId(); - return deleteAsync(new UnlinkOperation(name, null, getLockName(), currentThreadId, transactionId)); + return deleteAsync(new UnlinkOperation(name, null, lockName, currentThreadId, transactionId)); } public RFuture touchAsync() { long currentThreadId = Thread.currentThread().getId(); return executeLocked(timeout, () -> { if (deleted != null && deleted) { - operations.add(new TouchOperation(name, null, getLockName(), currentThreadId, transactionId)); + operations.add(new TouchOperation(name, null, lockName, currentThreadId, transactionId)); return new CompletableFutureWrapper<>(false); } return set.isExistsAsync().thenApply(exists -> { - operations.add(new TouchOperation(name, null, getLockName(), currentThreadId, transactionId)); + operations.add(new TouchOperation(name, null, lockName, currentThreadId, transactionId)); if (!exists) { return isExists(); } - return exists; + return true; }); }, getWriteLock()); } public RFuture deleteAsync() { long currentThreadId = Thread.currentThread().getId(); - return deleteAsync(new DeleteOperation(name, null, getLockName(), transactionId, currentThreadId)); + return deleteAsync(new DeleteOperation(name, null, lockName, transactionId, currentThreadId)); } protected RFuture deleteAsync(TransactionalOperation operation) { @@ -429,14 +422,6 @@ public abstract class BaseTransactionalSet extends BaseTransactionalObject { } } - private RLock getWriteLock() { - return new RedissonTransactionalWriteLock(commandExecutor, getLockName(), transactionId); - } - - private RLock getReadLock() { - return new RedissonTransactionalReadLock(commandExecutor, getLockName(), transactionId); - } - protected RFuture executeLocked(Object value, Supplier> runnable) { RLock lock = getLock(set, (V) value); long threadId = Thread.currentThread().getId(); @@ -458,13 +443,13 @@ public abstract class BaseTransactionalSet extends BaseTransactionalObject { long currentThreadId = Thread.currentThread().getId(); return executeLocked(timeout, () -> { if (hasExpiration) { - operations.add(new ClearExpireOperation(name, null, getLockName(), currentThreadId, transactionId)); + operations.add(new ClearExpireOperation(name, null, lockName, currentThreadId, transactionId)); hasExpiration = false; return CompletableFuture.completedFuture(true); } return set.remainTimeToLiveAsync().thenApply(res -> { - operations.add(new ClearExpireOperation(name, null, getLockName(), currentThreadId, transactionId)); + operations.add(new ClearExpireOperation(name, null, lockName, currentThreadId, transactionId)); hasExpiration = false; return res > 0; }); @@ -480,13 +465,13 @@ public abstract class BaseTransactionalSet extends BaseTransactionalObject { long currentThreadId = Thread.currentThread().getId(); return executeLocked(timeout, () -> { if (isExists()) { - operations.add(new ExpireOperation(name, null, getLockName(), currentThreadId, transactionId, timeToLive, timeUnit, param, keys)); + operations.add(new ExpireOperation(name, null, lockName, currentThreadId, transactionId, timeToLive, timeUnit, param, keys)); hasExpiration = true; return CompletableFuture.completedFuture(true); } return isExistsAsync().thenApply(res -> { - operations.add(new ExpireOperation(name, null, getLockName(), currentThreadId, transactionId, timeToLive, timeUnit, param, keys)); + operations.add(new ExpireOperation(name, null, lockName, currentThreadId, transactionId, timeToLive, timeUnit, param, keys)); hasExpiration = res; return res; }); @@ -497,13 +482,13 @@ public abstract class BaseTransactionalSet extends BaseTransactionalObject { long currentThreadId = Thread.currentThread().getId(); return executeLocked(timeout, () -> { if (isExists()) { - operations.add(new ExpireAtOperation(name, null, getLockName(), currentThreadId, transactionId, timestamp, param, keys)); + operations.add(new ExpireAtOperation(name, null, lockName, currentThreadId, transactionId, timestamp, param, keys)); hasExpiration = true; return CompletableFuture.completedFuture(true); } return isExistsAsync().thenApply(res -> { - operations.add(new ExpireAtOperation(name, null, getLockName(), currentThreadId, transactionId, timestamp, param, keys)); + operations.add(new ExpireAtOperation(name, null, lockName, currentThreadId, transactionId, timestamp, param, keys)); hasExpiration = res; return res; }); diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMap.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMap.java index cd1e3e494..6b6d943b5 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMap.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMap.java @@ -68,17 +68,17 @@ public class RedissonTransactionalMap extends RedissonMap { @Override public RFuture expireAsync(long timeToLive, TimeUnit timeUnit, String param, String... keys) { - throw new UnsupportedOperationException("expire method is not supported in transaction"); + return transactionalMap.expireAsync(timeToLive, timeUnit, param, keys); } @Override protected RFuture expireAtAsync(long timestamp, String param, String... keys) { - throw new UnsupportedOperationException("expire method is not supported in transaction"); + return transactionalMap.expireAtAsync(timestamp, param, keys); } @Override public RFuture clearExpireAsync() { - throw new UnsupportedOperationException("clearExpire method is not supported in transaction"); + return transactionalMap.clearExpireAsync(); } @Override diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMapCache.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMapCache.java index 6ec220ec9..17b9c4206 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMapCache.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMapCache.java @@ -61,17 +61,17 @@ public class RedissonTransactionalMapCache extends RedissonMapCache @Override public RFuture expireAsync(long timeToLive, TimeUnit timeUnit, String param, String... keys) { - throw new UnsupportedOperationException("expire method is not supported in transaction"); + return transactionalMap.expireAsync(timeToLive, timeUnit, param, keys); } @Override protected RFuture expireAtAsync(long timestamp, String param, String... keys) { - throw new UnsupportedOperationException("expire method is not supported in transaction"); + return transactionalMap.expireAtAsync(timestamp, param, keys); } @Override public RFuture clearExpireAsync() { - throw new UnsupportedOperationException("clearExpire method is not supported in transaction"); + return transactionalMap.clearExpireAsync(); } @Override diff --git a/redisson/src/main/java/org/redisson/transaction/TransactionalSet.java b/redisson/src/main/java/org/redisson/transaction/TransactionalSet.java index eb120271b..9f198fba5 100644 --- a/redisson/src/main/java/org/redisson/transaction/TransactionalSet.java +++ b/redisson/src/main/java/org/redisson/transaction/TransactionalSet.java @@ -60,7 +60,7 @@ public class TransactionalSet extends BaseTransactionalSet { @Override protected TransactionalOperation createAddOperation(V value, long threadId) { - return new AddOperation(set, value, getLockName(), transactionId, threadId); + return new AddOperation(set, value, lockName, transactionId, threadId); } @Override @@ -70,7 +70,7 @@ public class TransactionalSet extends BaseTransactionalSet { @Override protected TransactionalOperation createRemoveOperation(Object value, long threadId) { - return new RemoveOperation(set, value, getLockName(), transactionId, threadId); + return new RemoveOperation(set, value, lockName, transactionId, threadId); } }