From dc4d0729bea2866a2b4d8938a557a96643df0491 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Fri, 24 Jun 2022 08:24:36 +0300 Subject: [PATCH] Feature - touch(), unlink() and delete() methods implemented for RedissonTransactionalSetCache and RedissonTransactionalSet objects --- .../transaction/BaseTransactionalObject.java | 6 +- .../transaction/BaseTransactionalSet.java | 110 +++++++++++------- .../RedissonTransactionalReadLock.java | 40 +++++++ .../transaction/RedissonTransactionalSet.java | 20 +++- .../RedissonTransactionalSetCache.java | 20 +++- .../RedissonTransactionalWriteLock.java | 40 +++++++ .../transaction/TransactionalSet.java | 15 +-- .../transaction/TransactionalSetCache.java | 11 +- .../operation/DeleteOperation.java | 15 +++ .../transaction/operation/TouchOperation.java | 15 +++ .../operation/UnlinkOperation.java | 15 +++ .../operation/set/AddOperation.java | 14 ++- .../operation/set/RemoveOperation.java | 10 +- .../operation/set/SetOperation.java | 5 + 14 files changed, 259 insertions(+), 77 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/transaction/RedissonTransactionalReadLock.java create mode 100644 redisson/src/main/java/org/redisson/transaction/RedissonTransactionalWriteLock.java diff --git a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalObject.java b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalObject.java index 7cb1d5398..aedbeb1d2 100644 --- a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalObject.java +++ b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalObject.java @@ -63,7 +63,11 @@ public class BaseTransactionalObject { } protected RFuture executeLocked(long timeout, Supplier> runnable, RLock lock) { - CompletionStage f = lock.lockAsync(timeout, TimeUnit.MILLISECONDS).thenCompose(res -> runnable.get()); + return executeLocked(Thread.currentThread().getId(), timeout, runnable, lock); + } + + protected RFuture executeLocked(long threadId, long timeout, Supplier> runnable, RLock lock) { + CompletionStage f = lock.lockAsync(timeout, TimeUnit.MILLISECONDS, threadId).thenCompose(res -> runnable.get()); return new CompletableFutureWrapper<>(f); } diff --git a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalSet.java b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalSet.java index 9127fd392..0a71143dc 100644 --- a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalSet.java +++ b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalSet.java @@ -55,14 +55,17 @@ public abstract class BaseTransactionalSet extends BaseTransactionalObject { final String name; final CommandAsyncExecutor commandExecutor; Boolean deleted; - - public BaseTransactionalSet(CommandAsyncExecutor commandExecutor, long timeout, List operations, RCollectionAsync set) { + String transactionId; + + public BaseTransactionalSet(CommandAsyncExecutor commandExecutor, long timeout, List operations, + RCollectionAsync set, String transactionId) { this.commandExecutor = commandExecutor; this.timeout = timeout; this.operations = operations; this.set = set; this.object = (RObject) set; this.name = object.getName(); + this.transactionId = transactionId; } private HashValue toHash(Object value) { @@ -81,52 +84,57 @@ public abstract class BaseTransactionalSet extends BaseTransactionalObject { return set.isExistsAsync(); } - - public RFuture unlinkAsync(CommandAsyncExecutor commandExecutor) { - return deleteAsync(commandExecutor, new UnlinkOperation(name)); + + protected String getLockName() { + return name + ":transaction_lock"; } - - public RFuture touchAsync(CommandAsyncExecutor commandExecutor) { - if (deleted != null && deleted) { - operations.add(new TouchOperation(name)); - return new CompletableFutureWrapper<>(false); - } - CompletionStage f = set.isExistsAsync().thenApply(exists -> { - operations.add(new TouchOperation(name)); - if (!exists) { - for (Object value : state.values()) { - if (value != NULL) { - exists = true; - break; - } - } + public RFuture unlinkAsync() { + long currentThreadId = Thread.currentThread().getId(); + return deleteAsync(new UnlinkOperation(name, null, getLockName(), currentThreadId, transactionId)); + } + + public RFuture touchAsync() { + long currentThreadId = Thread.currentThread().getId(); + return executeLocked(timeout, () -> { + if (deleted != null && deleted) { + operations.add(new TouchOperation(name, null, getLockName(), currentThreadId, transactionId)); + return new CompletableFutureWrapper<>(false); } - return exists; - }); - return new CompletableFutureWrapper<>(f); + + return set.isExistsAsync().thenApply(exists -> { + operations.add(new TouchOperation(name, null, getLockName(), currentThreadId, transactionId)); + if (!exists) { + boolean notExists = state.values().stream().noneMatch(v -> v != NULL); + return !notExists; + } + return exists; + }); + }, getWriteLock()); } - public RFuture deleteAsync(CommandAsyncExecutor commandExecutor) { - return deleteAsync(commandExecutor, new DeleteOperation(name)); + public RFuture deleteAsync() { + long currentThreadId = Thread.currentThread().getId(); + return deleteAsync(new DeleteOperation(name, null, getLockName(), transactionId, currentThreadId)); } - protected RFuture deleteAsync(CommandAsyncExecutor commandExecutor, TransactionalOperation operation) { - if (deleted != null) { - operations.add(operation); - CompletableFuture result = new CompletableFuture<>(); - result.complete(!deleted); - deleted = true; - return new CompletableFutureWrapper<>(result); - } + protected RFuture deleteAsync(TransactionalOperation operation) { + return executeLocked(timeout, () -> { + if (deleted != null) { + operations.add(operation); + CompletableFuture result = new CompletableFuture<>(); + result.complete(!deleted); + deleted = true; + return result; + } - CompletionStage f = set.isExistsAsync().thenApply(res -> { - operations.add(operation); - state.replaceAll((k, v) -> NULL); - deleted = true; - return res; - }); - return new CompletableFutureWrapper<>(f); + return set.isExistsAsync().thenApply(res -> { + operations.add(operation); + state.replaceAll((k, v) -> NULL); + deleted = true; + return res; + }); + }, getWriteLock()); } public RFuture containsAsync(Object value) { @@ -240,7 +248,7 @@ public abstract class BaseTransactionalSet extends BaseTransactionalObject { List locks = Arrays.asList(destinationLock, lock); long threadId = Thread.currentThread().getId(); - return executeLocked(() -> { + return executeLocked(timeout, () -> { HashValue keyHash = toHash(value); Object currentValue = state.get(keyHash); if (currentValue != null) { @@ -264,7 +272,10 @@ public abstract class BaseTransactionalSet extends BaseTransactionalObject { protected abstract MoveOperation createMoveOperation(String destination, V value, long threadId); - protected abstract RLock getLock(RCollectionAsync set, V value); + private RLock getLock(RCollectionAsync set, V value) { + String lockName = ((RedissonObject) set).getLockByValue(value, "lock"); + return new RedissonTransactionalLock(commandExecutor, lockName, transactionId); + } public RFuture removeAsync(Object value) { long threadId = Thread.currentThread().getId(); @@ -418,10 +429,21 @@ public abstract class BaseTransactionalSet extends BaseTransactionalObject { oldValueBuf.readableBytes(); } } - + + private RLock getWriteLock() { + return new RedissonTransactionalWriteLock(commandExecutor, getLockName(), transactionId); + } + + private RLock getReadLock() { + return new RedissonTransactionalReadLock(commandExecutor, getLockName(), transactionId); + } + protected RFuture executeLocked(Object value, Supplier> runnable) { RLock lock = getLock(set, (V) value); - return executeLocked(timeout, runnable, lock); + long threadId = Thread.currentThread().getId(); + return executeLocked(threadId, timeout, () -> { + return executeLocked(threadId, timeout, runnable, lock); + }, getReadLock()); } protected RFuture executeLocked(Supplier> runnable, Collection values) { diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalReadLock.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalReadLock.java new file mode 100644 index 000000000..299c5af8b --- /dev/null +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalReadLock.java @@ -0,0 +1,40 @@ +/** + * Copyright (c) 2013-2021 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.transaction; + +import org.redisson.RedissonReadLock; +import org.redisson.command.CommandAsyncExecutor; + +/** + * + * @author Nikita Koksharov + * + */ +public class RedissonTransactionalReadLock extends RedissonReadLock { + + private final String transactionId; + + public RedissonTransactionalReadLock(CommandAsyncExecutor commandExecutor, String name, String transactionId) { + super(commandExecutor, name); + this.transactionId = transactionId; + } + + @Override + protected String getLockName(long threadId) { + return super.getLockName(threadId) + ":" + transactionId; + } + +} diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalSet.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalSet.java index cb17cb6fe..7c6a9c10b 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalSet.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalSet.java @@ -279,7 +279,25 @@ public class RedissonTransactionalSet extends RedissonSet { checkState(); return transactionalSet.readIntersectionAsync(names); } - + + @Override + public RFuture unlinkAsync() { + checkState(); + return transactionalSet.unlinkAsync(); + } + + @Override + public RFuture touchAsync() { + checkState(); + return transactionalSet.touchAsync(); + } + + @Override + public RFuture deleteAsync() { + checkState(); + return transactionalSet.deleteAsync(); + } + protected void checkState() { if (executed.get()) { throw new IllegalStateException("Unable to execute operation. Transaction is in finished state!"); diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalSetCache.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalSetCache.java index 9af33a8f6..4744dd71e 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalSetCache.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalSetCache.java @@ -141,7 +141,25 @@ public class RedissonTransactionalSetCache extends RedissonSetCache { checkState(); return transactionalSet.removeAllAsync(c); } - + + @Override + public RFuture unlinkAsync() { + checkState(); + return transactionalSet.unlinkAsync(); + } + + @Override + public RFuture touchAsync() { + checkState(); + return transactionalSet.touchAsync(); + } + + @Override + public RFuture deleteAsync() { + checkState(); + return transactionalSet.deleteAsync(); + } + protected void checkState() { if (executed.get()) { throw new IllegalStateException("Unable to execute operation. Transaction is in finished state!"); diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalWriteLock.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalWriteLock.java new file mode 100644 index 000000000..05d52fd8a --- /dev/null +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalWriteLock.java @@ -0,0 +1,40 @@ +/** + * Copyright (c) 2013-2021 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.transaction; + +import org.redisson.RedissonWriteLock; +import org.redisson.command.CommandAsyncExecutor; + +/** + * + * @author Nikita Koksharov + * + */ +public class RedissonTransactionalWriteLock extends RedissonWriteLock { + + private final String transactionId; + + public RedissonTransactionalWriteLock(CommandAsyncExecutor commandExecutor, String name, String transactionId) { + super(commandExecutor, name); + this.transactionId = transactionId; + } + + @Override + protected String getLockName(long threadId) { + return super.getLockName(threadId) + ":" + transactionId; + } + +} diff --git a/redisson/src/main/java/org/redisson/transaction/TransactionalSet.java b/redisson/src/main/java/org/redisson/transaction/TransactionalSet.java index 94655c475..eb120271b 100644 --- a/redisson/src/main/java/org/redisson/transaction/TransactionalSet.java +++ b/redisson/src/main/java/org/redisson/transaction/TransactionalSet.java @@ -15,12 +15,9 @@ */ package org.redisson.transaction; -import org.redisson.RedissonSet; import org.redisson.ScanIterator; import org.redisson.ScanResult; -import org.redisson.api.RCollectionAsync; import org.redisson.api.RFuture; -import org.redisson.api.RLock; import org.redisson.api.RSet; import org.redisson.client.RedisClient; import org.redisson.command.CommandAsyncExecutor; @@ -45,7 +42,7 @@ public class TransactionalSet extends BaseTransactionalSet { public TransactionalSet(CommandAsyncExecutor commandExecutor, long timeout, List operations, RSet set, String transactionId) { - super(commandExecutor, timeout, operations, set); + super(commandExecutor, timeout, operations, set, transactionId); this.set = set; this.transactionId = transactionId; } @@ -63,7 +60,7 @@ public class TransactionalSet extends BaseTransactionalSet { @Override protected TransactionalOperation createAddOperation(V value, long threadId) { - return new AddOperation(set, value, transactionId, threadId); + return new AddOperation(set, value, getLockName(), transactionId, threadId); } @Override @@ -73,13 +70,7 @@ public class TransactionalSet extends BaseTransactionalSet { @Override protected TransactionalOperation createRemoveOperation(Object value, long threadId) { - return new RemoveOperation(set, value, transactionId, threadId); - } - - @Override - protected RLock getLock(RCollectionAsync set, V value) { - String lockName = ((RedissonSet) set).getLockByValue(value, "lock"); - return new RedissonTransactionalLock(commandExecutor, lockName, transactionId); + return new RemoveOperation(set, value, getLockName(), transactionId, threadId); } } diff --git a/redisson/src/main/java/org/redisson/transaction/TransactionalSetCache.java b/redisson/src/main/java/org/redisson/transaction/TransactionalSetCache.java index b3185dcea..94334bbdd 100644 --- a/redisson/src/main/java/org/redisson/transaction/TransactionalSetCache.java +++ b/redisson/src/main/java/org/redisson/transaction/TransactionalSetCache.java @@ -15,12 +15,9 @@ */ package org.redisson.transaction; -import org.redisson.RedissonSetCache; import org.redisson.ScanIterator; import org.redisson.ScanResult; -import org.redisson.api.RCollectionAsync; import org.redisson.api.RFuture; -import org.redisson.api.RLock; import org.redisson.api.RSetCache; import org.redisson.client.RedisClient; import org.redisson.command.CommandAsyncExecutor; @@ -46,7 +43,7 @@ public class TransactionalSetCache extends BaseTransactionalSet { public TransactionalSetCache(CommandAsyncExecutor commandExecutor, long timeout, List operations, RSetCache set, String transactionId) { - super(commandExecutor, timeout, operations, set); + super(commandExecutor, timeout, operations, set, transactionId); this.set = set; this.transactionId = transactionId; } @@ -82,10 +79,4 @@ public class TransactionalSetCache extends BaseTransactionalSet { return new RemoveCacheOperation(set, value, transactionId, threadId); } - @Override - protected RLock getLock(RCollectionAsync set, V value) { - String lockName = ((RedissonSetCache) set).getLockByValue(value, "lock"); - return new RedissonTransactionalLock(commandExecutor, lockName, transactionId); - } - } diff --git a/redisson/src/main/java/org/redisson/transaction/operation/DeleteOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/DeleteOperation.java index e04258cd2..218c73f96 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/DeleteOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/DeleteOperation.java @@ -20,6 +20,7 @@ import org.redisson.RedissonLock; import org.redisson.api.RKeys; import org.redisson.command.CommandAsyncExecutor; import org.redisson.transaction.RedissonTransactionalLock; +import org.redisson.transaction.RedissonTransactionalWriteLock; /** * @@ -28,6 +29,7 @@ import org.redisson.transaction.RedissonTransactionalLock; */ public class DeleteOperation extends TransactionalOperation { + private String writeLockName; private String lockName; private String transactionId; @@ -41,6 +43,11 @@ public class DeleteOperation extends TransactionalOperation { this.transactionId = transactionId; } + public DeleteOperation(String name, String lockName, String writeLockName, String transactionId, long threadId) { + this(name, lockName, transactionId, threadId); + this.writeLockName = writeLockName; + } + @Override public void commit(CommandAsyncExecutor commandExecutor) { RKeys keys = new RedissonKeys(commandExecutor); @@ -49,6 +56,10 @@ public class DeleteOperation extends TransactionalOperation { RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); lock.unlockAsync(); } + if (writeLockName != null) { + RedissonLock lock = new RedissonTransactionalWriteLock(commandExecutor, writeLockName, transactionId); + lock.unlockAsync(getThreadId()); + } } @Override @@ -57,6 +68,10 @@ public class DeleteOperation extends TransactionalOperation { RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); lock.unlockAsync(); } + if (writeLockName != null) { + RedissonLock lock = new RedissonTransactionalWriteLock(commandExecutor, writeLockName, transactionId); + lock.unlockAsync(getThreadId()); + } } public String getLockName() { diff --git a/redisson/src/main/java/org/redisson/transaction/operation/TouchOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/TouchOperation.java index 8c064f6dd..61858e463 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/TouchOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/TouchOperation.java @@ -20,6 +20,7 @@ import org.redisson.RedissonLock; import org.redisson.api.RKeys; import org.redisson.command.CommandAsyncExecutor; import org.redisson.transaction.RedissonTransactionalLock; +import org.redisson.transaction.RedissonTransactionalWriteLock; /** * @@ -28,6 +29,7 @@ import org.redisson.transaction.RedissonTransactionalLock; */ public class TouchOperation extends TransactionalOperation { + private String writeLockName; private String lockName; private String transactionId; @@ -41,6 +43,11 @@ public class TouchOperation extends TransactionalOperation { this.transactionId = transactionId; } + public TouchOperation(String name, String lockName, String writeLockName, long threadId, String transactionId) { + this(name, lockName, threadId, transactionId); + this.writeLockName = writeLockName; + } + @Override public void commit(CommandAsyncExecutor commandExecutor) { RKeys keys = new RedissonKeys(commandExecutor); @@ -49,6 +56,10 @@ public class TouchOperation extends TransactionalOperation { RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); lock.unlockAsync(getThreadId()); } + if (writeLockName != null) { + RedissonLock lock = new RedissonTransactionalWriteLock(commandExecutor, writeLockName, transactionId); + lock.unlockAsync(getThreadId()); + } } @Override @@ -57,6 +68,10 @@ public class TouchOperation extends TransactionalOperation { RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); lock.unlockAsync(getThreadId()); } + if (writeLockName != null) { + RedissonLock lock = new RedissonTransactionalWriteLock(commandExecutor, writeLockName, transactionId); + lock.unlockAsync(getThreadId()); + } } public String getLockName() { diff --git a/redisson/src/main/java/org/redisson/transaction/operation/UnlinkOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/UnlinkOperation.java index a50906a99..ce9258c8b 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/UnlinkOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/UnlinkOperation.java @@ -20,6 +20,7 @@ import org.redisson.RedissonLock; import org.redisson.api.RKeys; import org.redisson.command.CommandAsyncExecutor; import org.redisson.transaction.RedissonTransactionalLock; +import org.redisson.transaction.RedissonTransactionalWriteLock; /** * @@ -28,6 +29,7 @@ import org.redisson.transaction.RedissonTransactionalLock; */ public class UnlinkOperation extends TransactionalOperation { + private String writeLockName; private String lockName; private String transactionId; @@ -41,6 +43,11 @@ public class UnlinkOperation extends TransactionalOperation { this.transactionId = transactionId; } + public UnlinkOperation(String name, String lockName, String writeLockName, long threadId, String transactionId) { + this(name, lockName, threadId, transactionId); + this.writeLockName = writeLockName; + } + @Override public void commit(CommandAsyncExecutor commandExecutor) { RKeys keys = new RedissonKeys(commandExecutor); @@ -49,6 +56,10 @@ public class UnlinkOperation extends TransactionalOperation { RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); lock.unlockAsync(getThreadId()); } + if (writeLockName != null) { + RedissonLock lock = new RedissonTransactionalWriteLock(commandExecutor, writeLockName, transactionId); + lock.unlockAsync(getThreadId()); + } } @Override @@ -57,6 +68,10 @@ public class UnlinkOperation extends TransactionalOperation { RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); lock.unlockAsync(getThreadId()); } + if (writeLockName != null) { + RedissonLock lock = new RedissonTransactionalWriteLock(commandExecutor, writeLockName, transactionId); + lock.unlockAsync(getThreadId()); + } } public String getLockName() { diff --git a/redisson/src/main/java/org/redisson/transaction/operation/set/AddOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/set/AddOperation.java index c52ef32d9..8c5e9bbe6 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/set/AddOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/set/AddOperation.java @@ -28,28 +28,32 @@ import org.redisson.command.CommandAsyncExecutor; */ public class AddOperation extends SetOperation { + private String readLockName; private Object value; - public AddOperation(RObject set, Object value, String transactionId, long threadId) { - this(set.getName(), set.getCodec(), value, transactionId, threadId); + public AddOperation(RObject set, Object value, String readLockName, String transactionId, long threadId) { + this(set.getName(), set.getCodec(), readLockName, value, transactionId, threadId); } - public AddOperation(String name, Codec codec, Object value, String transactionId, long threadId) { + public AddOperation(String name, Codec codec, String readLockName, Object value, String transactionId, long threadId) { super(name, codec, transactionId, threadId); this.value = value; + this.readLockName = readLockName; } @Override public void commit(CommandAsyncExecutor commandExecutor) { - RSet set = new RedissonSet(codec, commandExecutor, name, null); + RSet set = new RedissonSet<>(codec, commandExecutor, name, null); set.addAsync(value); getLock(set, commandExecutor, value).unlockAsync(threadId); + getReadLock(readLockName, commandExecutor).unlockAsync(threadId); } @Override public void rollback(CommandAsyncExecutor commandExecutor) { - RSet set = new RedissonSet(codec, commandExecutor, name, null); + RSet set = new RedissonSet<>(codec, commandExecutor, name, null); getLock(set, commandExecutor, value).unlockAsync(threadId); + getReadLock(readLockName, commandExecutor).unlockAsync(threadId); } public Object getValue() { diff --git a/redisson/src/main/java/org/redisson/transaction/operation/set/RemoveOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/set/RemoveOperation.java index 53f210699..7b3827f7a 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/set/RemoveOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/set/RemoveOperation.java @@ -28,15 +28,17 @@ import org.redisson.command.CommandAsyncExecutor; */ public class RemoveOperation extends SetOperation { + private String readLockName; private Object value; - public RemoveOperation(RObject set, Object value, String transactionId, long threadId) { - this(set.getName(), set.getCodec(), value, transactionId, threadId); + public RemoveOperation(RObject set, Object value, String readLockName, String transactionId, long threadId) { + this(set.getName(), set.getCodec(), readLockName, value, transactionId, threadId); } - public RemoveOperation(String name, Codec codec, Object value, String transactionId, long threadId) { + public RemoveOperation(String name, Codec codec, String readLockName, Object value, String transactionId, long threadId) { super(name, codec, transactionId, threadId); this.value = value; + this.readLockName = readLockName; } @Override @@ -44,12 +46,14 @@ public class RemoveOperation extends SetOperation { RSet set = new RedissonSet<>(codec, commandExecutor, name, null); set.removeAsync(value); getLock(set, commandExecutor, value).unlockAsync(threadId); + getReadLock(readLockName, commandExecutor).unlockAsync(threadId); } @Override public void rollback(CommandAsyncExecutor commandExecutor) { RSet set = new RedissonSet<>(codec, commandExecutor, name, null); getLock(set, commandExecutor, value).unlockAsync(threadId); + getReadLock(readLockName, commandExecutor).unlockAsync(threadId); } public Object getValue() { diff --git a/redisson/src/main/java/org/redisson/transaction/operation/set/SetOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/set/SetOperation.java index c8febc616..950a5fdb5 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/set/SetOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/set/SetOperation.java @@ -23,6 +23,7 @@ import org.redisson.api.RSetCache; import org.redisson.client.codec.Codec; import org.redisson.command.CommandAsyncExecutor; import org.redisson.transaction.RedissonTransactionalLock; +import org.redisson.transaction.RedissonTransactionalReadLock; import org.redisson.transaction.operation.TransactionalOperation; /** @@ -54,4 +55,8 @@ public abstract class SetOperation extends TransactionalOperation { return new RedissonTransactionalLock(commandExecutor, lockName, transactionId); } + protected RLock getReadLock(String readLockName, CommandAsyncExecutor commandExecutor) { + return new RedissonTransactionalReadLock(commandExecutor, readLockName, transactionId); + } + }