Feature - touch(), unlink() and delete() methods implemented for RedissonTransactionalSetCache and RedissonTransactionalSet objects

pull/4385/head
Nikita Koksharov 3 years ago
parent e6b37c47a9
commit dc4d0729be

@ -63,7 +63,11 @@ public class BaseTransactionalObject {
}
protected <R> RFuture<R> executeLocked(long timeout, Supplier<CompletionStage<R>> runnable, RLock lock) {
CompletionStage<R> f = lock.lockAsync(timeout, TimeUnit.MILLISECONDS).thenCompose(res -> runnable.get());
return executeLocked(Thread.currentThread().getId(), timeout, runnable, lock);
}
protected <R> RFuture<R> executeLocked(long threadId, long timeout, Supplier<CompletionStage<R>> runnable, RLock lock) {
CompletionStage<R> f = lock.lockAsync(timeout, TimeUnit.MILLISECONDS, threadId).thenCompose(res -> runnable.get());
return new CompletableFutureWrapper<>(f);
}

@ -55,14 +55,17 @@ public abstract class BaseTransactionalSet<V> extends BaseTransactionalObject {
final String name;
final CommandAsyncExecutor commandExecutor;
Boolean deleted;
String transactionId;
public BaseTransactionalSet(CommandAsyncExecutor commandExecutor, long timeout, List<TransactionalOperation> operations, RCollectionAsync<V> set) {
public BaseTransactionalSet(CommandAsyncExecutor commandExecutor, long timeout, List<TransactionalOperation> operations,
RCollectionAsync<V> set, String transactionId) {
this.commandExecutor = commandExecutor;
this.timeout = timeout;
this.operations = operations;
this.set = set;
this.object = (RObject) set;
this.name = object.getName();
this.transactionId = transactionId;
}
private HashValue toHash(Object value) {
@ -82,51 +85,56 @@ public abstract class BaseTransactionalSet<V> extends BaseTransactionalObject {
return set.isExistsAsync();
}
public RFuture<Boolean> unlinkAsync(CommandAsyncExecutor commandExecutor) {
return deleteAsync(commandExecutor, new UnlinkOperation(name));
protected String getLockName() {
return name + ":transaction_lock";
}
public RFuture<Boolean> touchAsync(CommandAsyncExecutor commandExecutor) {
public RFuture<Boolean> unlinkAsync() {
long currentThreadId = Thread.currentThread().getId();
return deleteAsync(new UnlinkOperation(name, null, getLockName(), currentThreadId, transactionId));
}
public RFuture<Boolean> touchAsync() {
long currentThreadId = Thread.currentThread().getId();
return executeLocked(timeout, () -> {
if (deleted != null && deleted) {
operations.add(new TouchOperation(name));
operations.add(new TouchOperation(name, null, getLockName(), currentThreadId, transactionId));
return new CompletableFutureWrapper<>(false);
}
CompletionStage<Boolean> f = set.isExistsAsync().thenApply(exists -> {
operations.add(new TouchOperation(name));
return set.isExistsAsync().thenApply(exists -> {
operations.add(new TouchOperation(name, null, getLockName(), currentThreadId, transactionId));
if (!exists) {
for (Object value : state.values()) {
if (value != NULL) {
exists = true;
break;
}
}
boolean notExists = state.values().stream().noneMatch(v -> v != NULL);
return !notExists;
}
return exists;
});
return new CompletableFutureWrapper<>(f);
}, getWriteLock());
}
public RFuture<Boolean> deleteAsync(CommandAsyncExecutor commandExecutor) {
return deleteAsync(commandExecutor, new DeleteOperation(name));
public RFuture<Boolean> deleteAsync() {
long currentThreadId = Thread.currentThread().getId();
return deleteAsync(new DeleteOperation(name, null, getLockName(), transactionId, currentThreadId));
}
protected RFuture<Boolean> deleteAsync(CommandAsyncExecutor commandExecutor, TransactionalOperation operation) {
protected RFuture<Boolean> deleteAsync(TransactionalOperation operation) {
return executeLocked(timeout, () -> {
if (deleted != null) {
operations.add(operation);
CompletableFuture<Boolean> result = new CompletableFuture<>();
result.complete(!deleted);
deleted = true;
return new CompletableFutureWrapper<>(result);
return result;
}
CompletionStage<Boolean> f = set.isExistsAsync().thenApply(res -> {
return set.isExistsAsync().thenApply(res -> {
operations.add(operation);
state.replaceAll((k, v) -> NULL);
deleted = true;
return res;
});
return new CompletableFutureWrapper<>(f);
}, getWriteLock());
}
public RFuture<Boolean> containsAsync(Object value) {
@ -240,7 +248,7 @@ public abstract class BaseTransactionalSet<V> extends BaseTransactionalObject {
List<RLock> locks = Arrays.asList(destinationLock, lock);
long threadId = Thread.currentThread().getId();
return executeLocked(() -> {
return executeLocked(timeout, () -> {
HashValue keyHash = toHash(value);
Object currentValue = state.get(keyHash);
if (currentValue != null) {
@ -264,7 +272,10 @@ public abstract class BaseTransactionalSet<V> extends BaseTransactionalObject {
protected abstract MoveOperation createMoveOperation(String destination, V value, long threadId);
protected abstract RLock getLock(RCollectionAsync<V> set, V value);
private RLock getLock(RCollectionAsync<V> set, V value) {
String lockName = ((RedissonObject) set).getLockByValue(value, "lock");
return new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
}
public RFuture<Boolean> removeAsync(Object value) {
long threadId = Thread.currentThread().getId();
@ -419,9 +430,20 @@ public abstract class BaseTransactionalSet<V> extends BaseTransactionalObject {
}
}
private RLock getWriteLock() {
return new RedissonTransactionalWriteLock(commandExecutor, getLockName(), transactionId);
}
private RLock getReadLock() {
return new RedissonTransactionalReadLock(commandExecutor, getLockName(), transactionId);
}
protected <R> RFuture<R> executeLocked(Object value, Supplier<CompletionStage<R>> runnable) {
RLock lock = getLock(set, (V) value);
return executeLocked(timeout, runnable, lock);
long threadId = Thread.currentThread().getId();
return executeLocked(threadId, timeout, () -> {
return executeLocked(threadId, timeout, runnable, lock);
}, getReadLock());
}
protected <R> RFuture<R> executeLocked(Supplier<CompletionStage<R>> runnable, Collection<?> values) {

@ -0,0 +1,40 @@
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.transaction;
import org.redisson.RedissonReadLock;
import org.redisson.command.CommandAsyncExecutor;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonTransactionalReadLock extends RedissonReadLock {
private final String transactionId;
public RedissonTransactionalReadLock(CommandAsyncExecutor commandExecutor, String name, String transactionId) {
super(commandExecutor, name);
this.transactionId = transactionId;
}
@Override
protected String getLockName(long threadId) {
return super.getLockName(threadId) + ":" + transactionId;
}
}

@ -280,6 +280,24 @@ public class RedissonTransactionalSet<V> extends RedissonSet<V> {
return transactionalSet.readIntersectionAsync(names);
}
@Override
public RFuture<Boolean> unlinkAsync() {
checkState();
return transactionalSet.unlinkAsync();
}
@Override
public RFuture<Boolean> touchAsync() {
checkState();
return transactionalSet.touchAsync();
}
@Override
public RFuture<Boolean> deleteAsync() {
checkState();
return transactionalSet.deleteAsync();
}
protected void checkState() {
if (executed.get()) {
throw new IllegalStateException("Unable to execute operation. Transaction is in finished state!");

@ -142,6 +142,24 @@ public class RedissonTransactionalSetCache<V> extends RedissonSetCache<V> {
return transactionalSet.removeAllAsync(c);
}
@Override
public RFuture<Boolean> unlinkAsync() {
checkState();
return transactionalSet.unlinkAsync();
}
@Override
public RFuture<Boolean> touchAsync() {
checkState();
return transactionalSet.touchAsync();
}
@Override
public RFuture<Boolean> deleteAsync() {
checkState();
return transactionalSet.deleteAsync();
}
protected void checkState() {
if (executed.get()) {
throw new IllegalStateException("Unable to execute operation. Transaction is in finished state!");

@ -0,0 +1,40 @@
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.transaction;
import org.redisson.RedissonWriteLock;
import org.redisson.command.CommandAsyncExecutor;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonTransactionalWriteLock extends RedissonWriteLock {
private final String transactionId;
public RedissonTransactionalWriteLock(CommandAsyncExecutor commandExecutor, String name, String transactionId) {
super(commandExecutor, name);
this.transactionId = transactionId;
}
@Override
protected String getLockName(long threadId) {
return super.getLockName(threadId) + ":" + transactionId;
}
}

@ -15,12 +15,9 @@
*/
package org.redisson.transaction;
import org.redisson.RedissonSet;
import org.redisson.ScanIterator;
import org.redisson.ScanResult;
import org.redisson.api.RCollectionAsync;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.api.RSet;
import org.redisson.client.RedisClient;
import org.redisson.command.CommandAsyncExecutor;
@ -45,7 +42,7 @@ public class TransactionalSet<V> extends BaseTransactionalSet<V> {
public TransactionalSet(CommandAsyncExecutor commandExecutor, long timeout, List<TransactionalOperation> operations,
RSet<V> set, String transactionId) {
super(commandExecutor, timeout, operations, set);
super(commandExecutor, timeout, operations, set, transactionId);
this.set = set;
this.transactionId = transactionId;
}
@ -63,7 +60,7 @@ public class TransactionalSet<V> extends BaseTransactionalSet<V> {
@Override
protected TransactionalOperation createAddOperation(V value, long threadId) {
return new AddOperation(set, value, transactionId, threadId);
return new AddOperation(set, value, getLockName(), transactionId, threadId);
}
@Override
@ -73,13 +70,7 @@ public class TransactionalSet<V> extends BaseTransactionalSet<V> {
@Override
protected TransactionalOperation createRemoveOperation(Object value, long threadId) {
return new RemoveOperation(set, value, transactionId, threadId);
}
@Override
protected RLock getLock(RCollectionAsync<V> set, V value) {
String lockName = ((RedissonSet<V>) set).getLockByValue(value, "lock");
return new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
return new RemoveOperation(set, value, getLockName(), transactionId, threadId);
}
}

@ -15,12 +15,9 @@
*/
package org.redisson.transaction;
import org.redisson.RedissonSetCache;
import org.redisson.ScanIterator;
import org.redisson.ScanResult;
import org.redisson.api.RCollectionAsync;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.api.RSetCache;
import org.redisson.client.RedisClient;
import org.redisson.command.CommandAsyncExecutor;
@ -46,7 +43,7 @@ public class TransactionalSetCache<V> extends BaseTransactionalSet<V> {
public TransactionalSetCache(CommandAsyncExecutor commandExecutor, long timeout, List<TransactionalOperation> operations,
RSetCache<V> set, String transactionId) {
super(commandExecutor, timeout, operations, set);
super(commandExecutor, timeout, operations, set, transactionId);
this.set = set;
this.transactionId = transactionId;
}
@ -82,10 +79,4 @@ public class TransactionalSetCache<V> extends BaseTransactionalSet<V> {
return new RemoveCacheOperation(set, value, transactionId, threadId);
}
@Override
protected RLock getLock(RCollectionAsync<V> set, V value) {
String lockName = ((RedissonSetCache<V>) set).getLockByValue(value, "lock");
return new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
}
}

@ -20,6 +20,7 @@ import org.redisson.RedissonLock;
import org.redisson.api.RKeys;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.RedissonTransactionalLock;
import org.redisson.transaction.RedissonTransactionalWriteLock;
/**
*
@ -28,6 +29,7 @@ import org.redisson.transaction.RedissonTransactionalLock;
*/
public class DeleteOperation extends TransactionalOperation {
private String writeLockName;
private String lockName;
private String transactionId;
@ -41,6 +43,11 @@ public class DeleteOperation extends TransactionalOperation {
this.transactionId = transactionId;
}
public DeleteOperation(String name, String lockName, String writeLockName, String transactionId, long threadId) {
this(name, lockName, transactionId, threadId);
this.writeLockName = writeLockName;
}
@Override
public void commit(CommandAsyncExecutor commandExecutor) {
RKeys keys = new RedissonKeys(commandExecutor);
@ -49,6 +56,10 @@ public class DeleteOperation extends TransactionalOperation {
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync();
}
if (writeLockName != null) {
RedissonLock lock = new RedissonTransactionalWriteLock(commandExecutor, writeLockName, transactionId);
lock.unlockAsync(getThreadId());
}
}
@Override
@ -57,6 +68,10 @@ public class DeleteOperation extends TransactionalOperation {
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync();
}
if (writeLockName != null) {
RedissonLock lock = new RedissonTransactionalWriteLock(commandExecutor, writeLockName, transactionId);
lock.unlockAsync(getThreadId());
}
}
public String getLockName() {

@ -20,6 +20,7 @@ import org.redisson.RedissonLock;
import org.redisson.api.RKeys;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.RedissonTransactionalLock;
import org.redisson.transaction.RedissonTransactionalWriteLock;
/**
*
@ -28,6 +29,7 @@ import org.redisson.transaction.RedissonTransactionalLock;
*/
public class TouchOperation extends TransactionalOperation {
private String writeLockName;
private String lockName;
private String transactionId;
@ -41,6 +43,11 @@ public class TouchOperation extends TransactionalOperation {
this.transactionId = transactionId;
}
public TouchOperation(String name, String lockName, String writeLockName, long threadId, String transactionId) {
this(name, lockName, threadId, transactionId);
this.writeLockName = writeLockName;
}
@Override
public void commit(CommandAsyncExecutor commandExecutor) {
RKeys keys = new RedissonKeys(commandExecutor);
@ -49,6 +56,10 @@ public class TouchOperation extends TransactionalOperation {
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync(getThreadId());
}
if (writeLockName != null) {
RedissonLock lock = new RedissonTransactionalWriteLock(commandExecutor, writeLockName, transactionId);
lock.unlockAsync(getThreadId());
}
}
@Override
@ -57,6 +68,10 @@ public class TouchOperation extends TransactionalOperation {
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync(getThreadId());
}
if (writeLockName != null) {
RedissonLock lock = new RedissonTransactionalWriteLock(commandExecutor, writeLockName, transactionId);
lock.unlockAsync(getThreadId());
}
}
public String getLockName() {

@ -20,6 +20,7 @@ import org.redisson.RedissonLock;
import org.redisson.api.RKeys;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.RedissonTransactionalLock;
import org.redisson.transaction.RedissonTransactionalWriteLock;
/**
*
@ -28,6 +29,7 @@ import org.redisson.transaction.RedissonTransactionalLock;
*/
public class UnlinkOperation extends TransactionalOperation {
private String writeLockName;
private String lockName;
private String transactionId;
@ -41,6 +43,11 @@ public class UnlinkOperation extends TransactionalOperation {
this.transactionId = transactionId;
}
public UnlinkOperation(String name, String lockName, String writeLockName, long threadId, String transactionId) {
this(name, lockName, threadId, transactionId);
this.writeLockName = writeLockName;
}
@Override
public void commit(CommandAsyncExecutor commandExecutor) {
RKeys keys = new RedissonKeys(commandExecutor);
@ -49,6 +56,10 @@ public class UnlinkOperation extends TransactionalOperation {
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync(getThreadId());
}
if (writeLockName != null) {
RedissonLock lock = new RedissonTransactionalWriteLock(commandExecutor, writeLockName, transactionId);
lock.unlockAsync(getThreadId());
}
}
@Override
@ -57,6 +68,10 @@ public class UnlinkOperation extends TransactionalOperation {
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync(getThreadId());
}
if (writeLockName != null) {
RedissonLock lock = new RedissonTransactionalWriteLock(commandExecutor, writeLockName, transactionId);
lock.unlockAsync(getThreadId());
}
}
public String getLockName() {

@ -28,28 +28,32 @@ import org.redisson.command.CommandAsyncExecutor;
*/
public class AddOperation extends SetOperation {
private String readLockName;
private Object value;
public AddOperation(RObject set, Object value, String transactionId, long threadId) {
this(set.getName(), set.getCodec(), value, transactionId, threadId);
public AddOperation(RObject set, Object value, String readLockName, String transactionId, long threadId) {
this(set.getName(), set.getCodec(), readLockName, value, transactionId, threadId);
}
public AddOperation(String name, Codec codec, Object value, String transactionId, long threadId) {
public AddOperation(String name, Codec codec, String readLockName, Object value, String transactionId, long threadId) {
super(name, codec, transactionId, threadId);
this.value = value;
this.readLockName = readLockName;
}
@Override
public void commit(CommandAsyncExecutor commandExecutor) {
RSet<Object> set = new RedissonSet<Object>(codec, commandExecutor, name, null);
RSet<Object> set = new RedissonSet<>(codec, commandExecutor, name, null);
set.addAsync(value);
getLock(set, commandExecutor, value).unlockAsync(threadId);
getReadLock(readLockName, commandExecutor).unlockAsync(threadId);
}
@Override
public void rollback(CommandAsyncExecutor commandExecutor) {
RSet<Object> set = new RedissonSet<Object>(codec, commandExecutor, name, null);
RSet<Object> set = new RedissonSet<>(codec, commandExecutor, name, null);
getLock(set, commandExecutor, value).unlockAsync(threadId);
getReadLock(readLockName, commandExecutor).unlockAsync(threadId);
}
public Object getValue() {

@ -28,15 +28,17 @@ import org.redisson.command.CommandAsyncExecutor;
*/
public class RemoveOperation extends SetOperation {
private String readLockName;
private Object value;
public RemoveOperation(RObject set, Object value, String transactionId, long threadId) {
this(set.getName(), set.getCodec(), value, transactionId, threadId);
public RemoveOperation(RObject set, Object value, String readLockName, String transactionId, long threadId) {
this(set.getName(), set.getCodec(), readLockName, value, transactionId, threadId);
}
public RemoveOperation(String name, Codec codec, Object value, String transactionId, long threadId) {
public RemoveOperation(String name, Codec codec, String readLockName, Object value, String transactionId, long threadId) {
super(name, codec, transactionId, threadId);
this.value = value;
this.readLockName = readLockName;
}
@Override
@ -44,12 +46,14 @@ public class RemoveOperation extends SetOperation {
RSet<Object> set = new RedissonSet<>(codec, commandExecutor, name, null);
set.removeAsync(value);
getLock(set, commandExecutor, value).unlockAsync(threadId);
getReadLock(readLockName, commandExecutor).unlockAsync(threadId);
}
@Override
public void rollback(CommandAsyncExecutor commandExecutor) {
RSet<Object> set = new RedissonSet<>(codec, commandExecutor, name, null);
getLock(set, commandExecutor, value).unlockAsync(threadId);
getReadLock(readLockName, commandExecutor).unlockAsync(threadId);
}
public Object getValue() {

@ -23,6 +23,7 @@ import org.redisson.api.RSetCache;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.RedissonTransactionalLock;
import org.redisson.transaction.RedissonTransactionalReadLock;
import org.redisson.transaction.operation.TransactionalOperation;
/**
@ -54,4 +55,8 @@ public abstract class SetOperation extends TransactionalOperation {
return new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
}
protected RLock getReadLock(String readLockName, CommandAsyncExecutor commandExecutor) {
return new RedissonTransactionalReadLock(commandExecutor, readLockName, transactionId);
}
}

Loading…
Cancel
Save