Feature - RedissonTransactionalMap and RedissonTransactionalMapCache support expire(), expireAt() and clearExpire() methods. #4249

pull/4428/head
Nikita Koksharov 3 years ago
parent 5bb53a4c4a
commit 46cf07ba27

@ -29,10 +29,7 @@ import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.CompletableFutureWrapper; import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.Hash; import org.redisson.misc.Hash;
import org.redisson.misc.HashValue; import org.redisson.misc.HashValue;
import org.redisson.transaction.operation.DeleteOperation; import org.redisson.transaction.operation.*;
import org.redisson.transaction.operation.TouchOperation;
import org.redisson.transaction.operation.TransactionalOperation;
import org.redisson.transaction.operation.UnlinkOperation;
import org.redisson.transaction.operation.map.*; import org.redisson.transaction.operation.map.*;
import java.math.BigDecimal; import java.math.BigDecimal;
@ -40,6 +37,7 @@ import java.util.*;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -80,17 +78,14 @@ public class BaseTransactionalMap<K, V> extends BaseTransactionalObject {
final List<TransactionalOperation> operations; final List<TransactionalOperation> operations;
final Map<HashValue, MapEntry> state = new HashMap<>(); final Map<HashValue, MapEntry> state = new HashMap<>();
final RMap<K, V> map; final RMap<K, V> map;
final CommandAsyncExecutor commandExecutor;
final String transactionId;
Boolean deleted; Boolean deleted;
boolean hasExpiration;
public BaseTransactionalMap(CommandAsyncExecutor commandExecutor, long timeout, List<TransactionalOperation> operations, RMap<K, V> map, String transactionId) { public BaseTransactionalMap(CommandAsyncExecutor commandExecutor, long timeout, List<TransactionalOperation> operations, RMap<K, V> map, String transactionId) {
super(); super(transactionId, getLockName(map.getName()), commandExecutor);
this.timeout = timeout; this.timeout = timeout;
this.operations = operations; this.operations = operations;
this.map = map; this.map = map;
this.commandExecutor = commandExecutor;
this.transactionId = transactionId;
} }
HashValue toKeyHash(Object key) { HashValue toKeyHash(Object key) {
@ -123,14 +118,9 @@ public class BaseTransactionalMap<K, V> extends BaseTransactionalObject {
CompletionStage<Boolean> f = map.isExistsAsync().thenApply(exists -> { CompletionStage<Boolean> f = map.isExistsAsync().thenApply(exists -> {
operations.add(new TouchOperation(map.getName())); operations.add(new TouchOperation(map.getName()));
if (!exists) { if (!exists) {
for (MapEntry entry : state.values()) { return isExists();
if (entry != MapEntry.NULL) {
exists = true;
break;
}
} }
} return true;
return exists;
}); });
return new CompletableFutureWrapper<>(f); return new CompletableFutureWrapper<>(f);
} }
@ -775,4 +765,60 @@ public class BaseTransactionalMap<K, V> extends BaseTransactionalObject {
return new RedissonTransactionalLock(commandExecutor, lockName, transactionId); return new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
} }
private boolean isExists() {
boolean notExists = state.values().stream().noneMatch(v -> v != MapEntry.NULL);
return !notExists;
}
public RFuture<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit, String param, String... keys) {
long currentThreadId = Thread.currentThread().getId();
return executeLocked(timeout, () -> {
if (isExists()) {
operations.add(new ExpireOperation(map.getName(), null, lockName, currentThreadId, transactionId, timeToLive, timeUnit, param, keys));
hasExpiration = true;
return CompletableFuture.completedFuture(true);
}
return isExistsAsync().thenApply(res -> {
operations.add(new ExpireOperation(map.getName(), null, lockName, currentThreadId, transactionId, timeToLive, timeUnit, param, keys));
hasExpiration = res;
return res;
});
}, getWriteLock());
}
public RFuture<Boolean> expireAtAsync(long timestamp, String param, String... keys) {
long currentThreadId = Thread.currentThread().getId();
return executeLocked(timeout, () -> {
if (isExists()) {
operations.add(new ExpireAtOperation(map.getName(), null, lockName, currentThreadId, transactionId, timestamp, param, keys));
hasExpiration = true;
return CompletableFuture.completedFuture(true);
}
return isExistsAsync().thenApply(res -> {
operations.add(new ExpireAtOperation(map.getName(), null, lockName, currentThreadId, transactionId, timestamp, param, keys));
hasExpiration = res;
return res;
});
}, getWriteLock());
}
public RFuture<Boolean> clearExpireAsync() {
long currentThreadId = Thread.currentThread().getId();
return executeLocked(timeout, () -> {
if (hasExpiration) {
operations.add(new ClearExpireOperation(map.getName(), null, lockName, currentThreadId, transactionId));
hasExpiration = false;
return CompletableFuture.completedFuture(true);
}
return map.remainTimeToLiveAsync().thenApply(res -> {
operations.add(new ClearExpireOperation(map.getName(), null, lockName, currentThreadId, transactionId));
hasExpiration = false;
return res > 0;
});
}, getWriteLock());
}
} }

@ -18,6 +18,7 @@ package org.redisson.transaction;
import org.redisson.RedissonMultiLock; import org.redisson.RedissonMultiLock;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RLock; import org.redisson.api.RLock;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.CompletableFutureWrapper; import org.redisson.misc.CompletableFutureWrapper;
import java.util.List; import java.util.List;
@ -32,6 +33,16 @@ import java.util.function.Supplier;
*/ */
public class BaseTransactionalObject { public class BaseTransactionalObject {
final String transactionId;
final String lockName;
final CommandAsyncExecutor commandExecutor;
public BaseTransactionalObject(String transactionId, String lockName, CommandAsyncExecutor commandExecutor) {
this.transactionId = transactionId;
this.lockName = lockName;
this.commandExecutor = commandExecutor;
}
public RFuture<Boolean> moveAsync(int database) { public RFuture<Boolean> moveAsync(int database) {
throw new UnsupportedOperationException("move method is not supported in transaction"); throw new UnsupportedOperationException("move method is not supported in transaction");
} }
@ -40,6 +51,18 @@ public class BaseTransactionalObject {
throw new UnsupportedOperationException("migrate method is not supported in transaction"); throw new UnsupportedOperationException("migrate method is not supported in transaction");
} }
protected RLock getWriteLock() {
return new RedissonTransactionalWriteLock(commandExecutor, lockName, transactionId);
}
protected RLock getReadLock() {
return new RedissonTransactionalReadLock(commandExecutor, lockName, transactionId);
}
protected static String getLockName(String name) {
return name + ":transaction_lock";
}
protected <R> RFuture<R> executeLocked(long timeout, Supplier<CompletionStage<R>> runnable, RLock lock) { protected <R> RFuture<R> executeLocked(long timeout, Supplier<CompletionStage<R>> runnable, RLock lock) {
return executeLocked(Thread.currentThread().getId(), timeout, runnable, lock); return executeLocked(Thread.currentThread().getId(), timeout, runnable, lock);
} }

@ -46,26 +46,23 @@ public abstract class BaseTransactionalSet<V> extends BaseTransactionalObject {
static final Object NULL = new Object(); static final Object NULL = new Object();
private final long timeout; private final long timeout;
final Map<HashValue, Object> state = new HashMap<HashValue, Object>(); final Map<HashValue, Object> state = new HashMap<>();
final List<TransactionalOperation> operations; final List<TransactionalOperation> operations;
final RCollectionAsync<V> set; final RCollectionAsync<V> set;
final RObject object; final RObject object;
final String name; final String name;
final CommandAsyncExecutor commandExecutor;
Boolean deleted; Boolean deleted;
String transactionId;
boolean hasExpiration; boolean hasExpiration;
public BaseTransactionalSet(CommandAsyncExecutor commandExecutor, long timeout, List<TransactionalOperation> operations, public BaseTransactionalSet(CommandAsyncExecutor commandExecutor, long timeout, List<TransactionalOperation> operations,
RCollectionAsync<V> set, String transactionId) { RCollectionAsync<V> set, String transactionId) {
this.commandExecutor = commandExecutor; super(transactionId, getLockName(((RObject) set).getName()), commandExecutor);
this.timeout = timeout; this.timeout = timeout;
this.operations = operations; this.operations = operations;
this.set = set; this.set = set;
this.object = (RObject) set; this.object = (RObject) set;
this.name = object.getName(); this.name = object.getName();
this.transactionId = transactionId;
} }
private HashValue toHash(Object value) { private HashValue toHash(Object value) {
@ -85,36 +82,32 @@ public abstract class BaseTransactionalSet<V> extends BaseTransactionalObject {
return set.isExistsAsync(); return set.isExistsAsync();
} }
protected String getLockName() {
return name + ":transaction_lock";
}
public RFuture<Boolean> unlinkAsync() { public RFuture<Boolean> unlinkAsync() {
long currentThreadId = Thread.currentThread().getId(); long currentThreadId = Thread.currentThread().getId();
return deleteAsync(new UnlinkOperation(name, null, getLockName(), currentThreadId, transactionId)); return deleteAsync(new UnlinkOperation(name, null, lockName, currentThreadId, transactionId));
} }
public RFuture<Boolean> touchAsync() { public RFuture<Boolean> touchAsync() {
long currentThreadId = Thread.currentThread().getId(); long currentThreadId = Thread.currentThread().getId();
return executeLocked(timeout, () -> { return executeLocked(timeout, () -> {
if (deleted != null && deleted) { if (deleted != null && deleted) {
operations.add(new TouchOperation(name, null, getLockName(), currentThreadId, transactionId)); operations.add(new TouchOperation(name, null, lockName, currentThreadId, transactionId));
return new CompletableFutureWrapper<>(false); return new CompletableFutureWrapper<>(false);
} }
return set.isExistsAsync().thenApply(exists -> { return set.isExistsAsync().thenApply(exists -> {
operations.add(new TouchOperation(name, null, getLockName(), currentThreadId, transactionId)); operations.add(new TouchOperation(name, null, lockName, currentThreadId, transactionId));
if (!exists) { if (!exists) {
return isExists(); return isExists();
} }
return exists; return true;
}); });
}, getWriteLock()); }, getWriteLock());
} }
public RFuture<Boolean> deleteAsync() { public RFuture<Boolean> deleteAsync() {
long currentThreadId = Thread.currentThread().getId(); long currentThreadId = Thread.currentThread().getId();
return deleteAsync(new DeleteOperation(name, null, getLockName(), transactionId, currentThreadId)); return deleteAsync(new DeleteOperation(name, null, lockName, transactionId, currentThreadId));
} }
protected RFuture<Boolean> deleteAsync(TransactionalOperation operation) { protected RFuture<Boolean> deleteAsync(TransactionalOperation operation) {
@ -429,14 +422,6 @@ public abstract class BaseTransactionalSet<V> extends BaseTransactionalObject {
} }
} }
private RLock getWriteLock() {
return new RedissonTransactionalWriteLock(commandExecutor, getLockName(), transactionId);
}
private RLock getReadLock() {
return new RedissonTransactionalReadLock(commandExecutor, getLockName(), transactionId);
}
protected <R> RFuture<R> executeLocked(Object value, Supplier<CompletionStage<R>> runnable) { protected <R> RFuture<R> executeLocked(Object value, Supplier<CompletionStage<R>> runnable) {
RLock lock = getLock(set, (V) value); RLock lock = getLock(set, (V) value);
long threadId = Thread.currentThread().getId(); long threadId = Thread.currentThread().getId();
@ -458,13 +443,13 @@ public abstract class BaseTransactionalSet<V> extends BaseTransactionalObject {
long currentThreadId = Thread.currentThread().getId(); long currentThreadId = Thread.currentThread().getId();
return executeLocked(timeout, () -> { return executeLocked(timeout, () -> {
if (hasExpiration) { if (hasExpiration) {
operations.add(new ClearExpireOperation(name, null, getLockName(), currentThreadId, transactionId)); operations.add(new ClearExpireOperation(name, null, lockName, currentThreadId, transactionId));
hasExpiration = false; hasExpiration = false;
return CompletableFuture.completedFuture(true); return CompletableFuture.completedFuture(true);
} }
return set.remainTimeToLiveAsync().thenApply(res -> { return set.remainTimeToLiveAsync().thenApply(res -> {
operations.add(new ClearExpireOperation(name, null, getLockName(), currentThreadId, transactionId)); operations.add(new ClearExpireOperation(name, null, lockName, currentThreadId, transactionId));
hasExpiration = false; hasExpiration = false;
return res > 0; return res > 0;
}); });
@ -480,13 +465,13 @@ public abstract class BaseTransactionalSet<V> extends BaseTransactionalObject {
long currentThreadId = Thread.currentThread().getId(); long currentThreadId = Thread.currentThread().getId();
return executeLocked(timeout, () -> { return executeLocked(timeout, () -> {
if (isExists()) { if (isExists()) {
operations.add(new ExpireOperation(name, null, getLockName(), currentThreadId, transactionId, timeToLive, timeUnit, param, keys)); operations.add(new ExpireOperation(name, null, lockName, currentThreadId, transactionId, timeToLive, timeUnit, param, keys));
hasExpiration = true; hasExpiration = true;
return CompletableFuture.completedFuture(true); return CompletableFuture.completedFuture(true);
} }
return isExistsAsync().thenApply(res -> { return isExistsAsync().thenApply(res -> {
operations.add(new ExpireOperation(name, null, getLockName(), currentThreadId, transactionId, timeToLive, timeUnit, param, keys)); operations.add(new ExpireOperation(name, null, lockName, currentThreadId, transactionId, timeToLive, timeUnit, param, keys));
hasExpiration = res; hasExpiration = res;
return res; return res;
}); });
@ -497,13 +482,13 @@ public abstract class BaseTransactionalSet<V> extends BaseTransactionalObject {
long currentThreadId = Thread.currentThread().getId(); long currentThreadId = Thread.currentThread().getId();
return executeLocked(timeout, () -> { return executeLocked(timeout, () -> {
if (isExists()) { if (isExists()) {
operations.add(new ExpireAtOperation(name, null, getLockName(), currentThreadId, transactionId, timestamp, param, keys)); operations.add(new ExpireAtOperation(name, null, lockName, currentThreadId, transactionId, timestamp, param, keys));
hasExpiration = true; hasExpiration = true;
return CompletableFuture.completedFuture(true); return CompletableFuture.completedFuture(true);
} }
return isExistsAsync().thenApply(res -> { return isExistsAsync().thenApply(res -> {
operations.add(new ExpireAtOperation(name, null, getLockName(), currentThreadId, transactionId, timestamp, param, keys)); operations.add(new ExpireAtOperation(name, null, lockName, currentThreadId, transactionId, timestamp, param, keys));
hasExpiration = res; hasExpiration = res;
return res; return res;
}); });

@ -68,17 +68,17 @@ public class RedissonTransactionalMap<K, V> extends RedissonMap<K, V> {
@Override @Override
public RFuture<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit, String param, String... keys) { public RFuture<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit, String param, String... keys) {
throw new UnsupportedOperationException("expire method is not supported in transaction"); return transactionalMap.expireAsync(timeToLive, timeUnit, param, keys);
} }
@Override @Override
protected RFuture<Boolean> expireAtAsync(long timestamp, String param, String... keys) { protected RFuture<Boolean> expireAtAsync(long timestamp, String param, String... keys) {
throw new UnsupportedOperationException("expire method is not supported in transaction"); return transactionalMap.expireAtAsync(timestamp, param, keys);
} }
@Override @Override
public RFuture<Boolean> clearExpireAsync() { public RFuture<Boolean> clearExpireAsync() {
throw new UnsupportedOperationException("clearExpire method is not supported in transaction"); return transactionalMap.clearExpireAsync();
} }
@Override @Override

@ -61,17 +61,17 @@ public class RedissonTransactionalMapCache<K, V> extends RedissonMapCache<K, V>
@Override @Override
public RFuture<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit, String param, String... keys) { public RFuture<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit, String param, String... keys) {
throw new UnsupportedOperationException("expire method is not supported in transaction"); return transactionalMap.expireAsync(timeToLive, timeUnit, param, keys);
} }
@Override @Override
protected RFuture<Boolean> expireAtAsync(long timestamp, String param, String... keys) { protected RFuture<Boolean> expireAtAsync(long timestamp, String param, String... keys) {
throw new UnsupportedOperationException("expire method is not supported in transaction"); return transactionalMap.expireAtAsync(timestamp, param, keys);
} }
@Override @Override
public RFuture<Boolean> clearExpireAsync() { public RFuture<Boolean> clearExpireAsync() {
throw new UnsupportedOperationException("clearExpire method is not supported in transaction"); return transactionalMap.clearExpireAsync();
} }
@Override @Override

@ -60,7 +60,7 @@ public class TransactionalSet<V> extends BaseTransactionalSet<V> {
@Override @Override
protected TransactionalOperation createAddOperation(V value, long threadId) { protected TransactionalOperation createAddOperation(V value, long threadId) {
return new AddOperation(set, value, getLockName(), transactionId, threadId); return new AddOperation(set, value, lockName, transactionId, threadId);
} }
@Override @Override
@ -70,7 +70,7 @@ public class TransactionalSet<V> extends BaseTransactionalSet<V> {
@Override @Override
protected TransactionalOperation createRemoveOperation(Object value, long threadId) { protected TransactionalOperation createRemoveOperation(Object value, long threadId) {
return new RemoveOperation(set, value, getLockName(), transactionId, threadId); return new RemoveOperation(set, value, lockName, transactionId, threadId);
} }
} }

Loading…
Cancel
Save