From 3ba3d543eacdfaf94c31427fc9d6b63dfbce77cd Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 23 Jun 2022 11:37:24 +0300 Subject: [PATCH] Feature - RedissonTransactionalBucket supports expire(), expireAt() and clearExpire() methods. #4249 --- .../RedissonTransactionalBucket.java | 71 ++++++++++++--- .../operation/ClearExpireOperation.java | 66 ++++++++++++++ .../operation/ExpireAtOperation.java | 86 ++++++++++++++++++ .../operation/ExpireOperation.java | 89 +++++++++++++++++++ .../transaction/operation/TouchOperation.java | 19 ++-- .../operation/UnlinkOperation.java | 11 ++- .../RedissonTransactionalBucketTest.java | 28 +++++- 7 files changed, 345 insertions(+), 25 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/transaction/operation/ClearExpireOperation.java create mode 100644 redisson/src/main/java/org/redisson/transaction/operation/ExpireAtOperation.java create mode 100644 redisson/src/main/java/org/redisson/transaction/operation/ExpireOperation.java diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBucket.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBucket.java index 2726efbcb..b71fb8b1c 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBucket.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBucket.java @@ -22,10 +22,7 @@ import org.redisson.api.RLock; import org.redisson.client.codec.Codec; import org.redisson.command.CommandAsyncExecutor; import org.redisson.misc.CompletableFutureWrapper; -import org.redisson.transaction.operation.DeleteOperation; -import org.redisson.transaction.operation.TouchOperation; -import org.redisson.transaction.operation.TransactionalOperation; -import org.redisson.transaction.operation.UnlinkOperation; +import org.redisson.transaction.operation.*; import org.redisson.transaction.operation.bucket.*; import java.util.List; @@ -50,6 +47,7 @@ public class RedissonTransactionalBucket extends RedissonBucket { private final AtomicBoolean executed; private final List operations; private Object state; + private boolean hasExpiration; private final String transactionId; public RedissonTransactionalBucket(CommandAsyncExecutor commandExecutor, long timeout, String name, List operations, AtomicBoolean executed, String transactionId) { @@ -69,18 +67,60 @@ public class RedissonTransactionalBucket extends RedissonBucket { } @Override - public RFuture expireAsync(long timeToLive, TimeUnit timeUnit, String param, String... keys) { - throw new UnsupportedOperationException("expire method is not supported in transaction"); + protected RFuture expireAsync(long timeToLive, TimeUnit timeUnit, String param, String... keys) { + checkState(); + long currentThreadId = Thread.currentThread().getId(); + return executeLocked(() -> { + if (state != null) { + operations.add(new ExpireOperation(getRawName(), getLockName(), currentThreadId, transactionId, timeToLive, timeUnit, param, keys)); + hasExpiration = true; + return CompletableFuture.completedFuture(state != NULL); + } + + return isExistsAsync().thenApply(res -> { + operations.add(new ExpireOperation(getRawName(), getLockName(), currentThreadId, transactionId, timeToLive, timeUnit, param, keys)); + hasExpiration = res; + return res; + }); + }); } @Override protected RFuture expireAtAsync(long timestamp, String param, String... keys) { - throw new UnsupportedOperationException("expire method is not supported in transaction"); + checkState(); + long currentThreadId = Thread.currentThread().getId(); + return executeLocked(() -> { + if (state != null) { + operations.add(new ExpireAtOperation(getRawName(), getLockName(), currentThreadId, transactionId, timestamp, param, keys)); + hasExpiration = true; + return CompletableFuture.completedFuture(state != NULL); + } + + return isExistsAsync().thenApply(res -> { + operations.add(new ExpireAtOperation(getRawName(), getLockName(), currentThreadId, transactionId, timestamp, param, keys)); + hasExpiration = res; + return res; + }); + }); } @Override public RFuture clearExpireAsync() { - throw new UnsupportedOperationException("clearExpire method is not supported in transaction"); + checkState(); + long currentThreadId = Thread.currentThread().getId(); + return executeLocked(() -> { + if (hasExpiration) { + operations.add(new ClearExpireOperation(getRawName(), getLockName(), currentThreadId, transactionId)); + hasExpiration = false; + return CompletableFuture.completedFuture(true); + } + + return remainTimeToLiveAsync().thenApply(res -> { + operations.add(new ClearExpireOperation(getRawName(), getLockName(), currentThreadId, transactionId)); + hasExpiration = false; + return res > 0; + }); + }); } @Override @@ -126,12 +166,12 @@ public class RedissonTransactionalBucket extends RedissonBucket { long currentThreadId = Thread.currentThread().getId(); return executeLocked(() -> { if (state != null) { - operations.add(new TouchOperation(getRawName(), getLockName(), currentThreadId)); + operations.add(new TouchOperation(getRawName(), getLockName(), currentThreadId, transactionId)); return CompletableFuture.completedFuture(state != NULL); } return isExistsAsync().thenApply(res -> { - operations.add(new TouchOperation(getRawName(), getLockName(), currentThreadId)); + operations.add(new TouchOperation(getRawName(), getLockName(), currentThreadId, transactionId)); return res; }); }); @@ -143,7 +183,7 @@ public class RedissonTransactionalBucket extends RedissonBucket { long currentThreadId = Thread.currentThread().getId(); return executeLocked(() -> { if (state != null) { - operations.add(new UnlinkOperation(getRawName(), getLockName(), currentThreadId)); + operations.add(new UnlinkOperation(getRawName(), getLockName(), currentThreadId, transactionId)); if (state == NULL) { return CompletableFuture.completedFuture(false); } else { @@ -153,7 +193,7 @@ public class RedissonTransactionalBucket extends RedissonBucket { } return isExistsAsync().thenApply(res -> { - operations.add(new UnlinkOperation(getRawName(), getLockName(), currentThreadId)); + operations.add(new UnlinkOperation(getRawName(), getLockName(), currentThreadId, transactionId)); state = NULL; return res; }); @@ -281,9 +321,10 @@ public class RedissonTransactionalBucket extends RedissonBucket { return setAsync(newValue, new BucketSetOperation(getRawName(), getLockName(), getCodec(), newValue, transactionId, currentThreadId)); } - private RFuture setAsync(V newValue, TransactionalOperation operation) { + private RFuture setAsync(V newValue, BucketSetOperation operation) { checkState(); return executeLocked(() -> { + hasExpiration = operation.getTimeUnit() != null; operations.add(operation); state = Optional.ofNullable((Object) newValue).orElse(NULL); return CompletableFuture.completedFuture(null); @@ -308,13 +349,14 @@ public class RedissonTransactionalBucket extends RedissonBucket { return trySet(value, new BucketTrySetOperation(getRawName(), getLockName(), getCodec(), value, timeToLive, timeUnit, transactionId, currentThreadId)); } - private RFuture trySet(V newValue, TransactionalOperation operation) { + private RFuture trySet(V newValue, BucketTrySetOperation operation) { checkState(); return executeLocked(() -> { if (state != null) { operations.add(operation); if (state == NULL) { state = Optional.ofNullable((Object) newValue).orElse(NULL); + hasExpiration = operation.getTimeUnit() != null; return CompletableFuture.completedFuture(true); } else { return CompletableFuture.completedFuture(false); @@ -324,6 +366,7 @@ public class RedissonTransactionalBucket extends RedissonBucket { return getAsync().thenApply(res -> { operations.add(operation); if (res == null) { + hasExpiration = operation.getTimeUnit() != null; state = Optional.ofNullable((Object) newValue).orElse(NULL); return true; } diff --git a/redisson/src/main/java/org/redisson/transaction/operation/ClearExpireOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/ClearExpireOperation.java new file mode 100644 index 000000000..4b9864e8e --- /dev/null +++ b/redisson/src/main/java/org/redisson/transaction/operation/ClearExpireOperation.java @@ -0,0 +1,66 @@ +/** + * Copyright (c) 2013-2021 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.transaction.operation; + +import org.redisson.RedissonKeys; +import org.redisson.RedissonLock; +import org.redisson.api.RKeys; +import org.redisson.command.CommandAsyncExecutor; +import org.redisson.transaction.RedissonTransactionalLock; + +/** + * + * @author Nikita Koksharov + * + */ +public class ClearExpireOperation extends TransactionalOperation { + + private String lockName; + private String transactionId; + + public ClearExpireOperation(String name) { + this(name, null, 0, null); + } + + public ClearExpireOperation(String name, String lockName, long threadId, String transactionId) { + super(name, null, threadId); + this.lockName = lockName; + this.transactionId = transactionId; + } + + @Override + public void commit(CommandAsyncExecutor commandExecutor) { + RKeys keys = new RedissonKeys(commandExecutor); + keys.clearExpireAsync(getName()); + if (lockName != null) { + RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); + lock.unlockAsync(getThreadId()); + } + } + + @Override + public void rollback(CommandAsyncExecutor commandExecutor) { + if (lockName != null) { + RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); + lock.unlockAsync(getThreadId()); + } + } + + public String getLockName() { + return lockName; + } + +} diff --git a/redisson/src/main/java/org/redisson/transaction/operation/ExpireAtOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/ExpireAtOperation.java new file mode 100644 index 000000000..93c1db79b --- /dev/null +++ b/redisson/src/main/java/org/redisson/transaction/operation/ExpireAtOperation.java @@ -0,0 +1,86 @@ +/** + * Copyright (c) 2013-2021 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.transaction.operation; + +import org.redisson.RedissonBucket; +import org.redisson.RedissonLock; +import org.redisson.api.RFuture; +import org.redisson.command.CommandAsyncExecutor; +import org.redisson.transaction.RedissonTransactionalLock; + +/** + * + * @author Nikita Koksharov + * + */ +public class ExpireAtOperation extends TransactionalOperation { + + public static final class RedissonBucketExtended extends RedissonBucket { + + public RedissonBucketExtended(CommandAsyncExecutor connectionManager, String name) { + super(connectionManager, name); + } + + @Override + protected RFuture expireAtAsync(long timestamp, String param, String... keys) { + return super.expireAtAsync(timestamp, param, keys); + } + + } + + + private String lockName; + private String transactionId; + private long timestamp; + private String param; + private String[] keys; + + public ExpireAtOperation(String name) { + this(name, null, 0, null, 0, null, (String[]) null); + } + + public ExpireAtOperation(String name, String lockName, long threadId, String transactionId, long timestamp, String param, String... keys) { + super(name, null, threadId); + this.lockName = lockName; + this.transactionId = transactionId; + this.timestamp = timestamp; + this.param = param; + this.keys = keys; + } + + @Override + public void commit(CommandAsyncExecutor commandExecutor) { + RedissonBucketExtended bucket = new RedissonBucketExtended(commandExecutor, name); + bucket.expireAtAsync(timestamp, param, keys); + if (lockName != null) { + RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); + lock.unlockAsync(getThreadId()); + } + } + + @Override + public void rollback(CommandAsyncExecutor commandExecutor) { + if (lockName != null) { + RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); + lock.unlockAsync(getThreadId()); + } + } + + public String getLockName() { + return lockName; + } + +} diff --git a/redisson/src/main/java/org/redisson/transaction/operation/ExpireOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/ExpireOperation.java new file mode 100644 index 000000000..a2b7d38f0 --- /dev/null +++ b/redisson/src/main/java/org/redisson/transaction/operation/ExpireOperation.java @@ -0,0 +1,89 @@ +/** + * Copyright (c) 2013-2021 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.transaction.operation; + +import org.redisson.RedissonBucket; +import org.redisson.RedissonLock; +import org.redisson.api.RFuture; +import org.redisson.command.CommandAsyncExecutor; +import org.redisson.transaction.RedissonTransactionalLock; + +import java.util.concurrent.TimeUnit; + +/** + * + * @author Nikita Koksharov + * + */ +public class ExpireOperation extends TransactionalOperation { + + public static final class RedissonBucketExtended extends RedissonBucket { + + public RedissonBucketExtended(CommandAsyncExecutor connectionManager, String name) { + super(connectionManager, name); + } + + @Override + public RFuture expireAsync(long timeToLive, TimeUnit timeUnit, String param, String... keys) { + return super.expireAsync(timeToLive, timeUnit, param, keys); + } + } + + + private String lockName; + private String transactionId; + private long timeToLive; + private TimeUnit timeUnit; + private String param; + private String[] keys; + + public ExpireOperation(String name) { + this(name, null, 0, null, 0, null, null, (String[]) null); + } + + public ExpireOperation(String name, String lockName, long threadId, String transactionId, long timeToLive, TimeUnit timeUnit, String param, String... keys) { + super(name, null, threadId); + this.lockName = lockName; + this.transactionId = transactionId; + this.timeToLive = timeToLive; + this.timeUnit = timeUnit; + this.param = param; + this.keys = keys; + } + + @Override + public void commit(CommandAsyncExecutor commandExecutor) { + RedissonBucketExtended bucket = new RedissonBucketExtended(commandExecutor, name); + bucket.expireAsync(timeToLive, timeUnit, param, keys); + if (lockName != null) { + RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); + lock.unlockAsync(getThreadId()); + } + } + + @Override + public void rollback(CommandAsyncExecutor commandExecutor) { + if (lockName != null) { + RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); + lock.unlockAsync(getThreadId()); + } + } + + public String getLockName() { + return lockName; + } + +} diff --git a/redisson/src/main/java/org/redisson/transaction/operation/TouchOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/TouchOperation.java index 9e0fecd1e..8c064f6dd 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/TouchOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/TouchOperation.java @@ -19,6 +19,7 @@ import org.redisson.RedissonKeys; import org.redisson.RedissonLock; import org.redisson.api.RKeys; import org.redisson.command.CommandAsyncExecutor; +import org.redisson.transaction.RedissonTransactionalLock; /** * @@ -28,28 +29,34 @@ import org.redisson.command.CommandAsyncExecutor; public class TouchOperation extends TransactionalOperation { private String lockName; + private String transactionId; public TouchOperation(String name) { - this(name, null, 0); + this(name, null, 0, null); } - public TouchOperation(String name, String lockName, long threadId) { + public TouchOperation(String name, String lockName, long threadId, String transactionId) { super(name, null, threadId); this.lockName = lockName; + this.transactionId = transactionId; } @Override public void commit(CommandAsyncExecutor commandExecutor) { RKeys keys = new RedissonKeys(commandExecutor); keys.touchAsync(getName()); - RedissonLock lock = new RedissonLock(commandExecutor, lockName); - lock.unlockAsync(getThreadId()); + if (lockName != null) { + RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); + lock.unlockAsync(getThreadId()); + } } @Override public void rollback(CommandAsyncExecutor commandExecutor) { - RedissonLock lock = new RedissonLock(commandExecutor, lockName); - lock.unlockAsync(getThreadId()); + if (lockName != null) { + RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); + lock.unlockAsync(getThreadId()); + } } public String getLockName() { diff --git a/redisson/src/main/java/org/redisson/transaction/operation/UnlinkOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/UnlinkOperation.java index 65da155fc..a50906a99 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/UnlinkOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/UnlinkOperation.java @@ -19,6 +19,7 @@ import org.redisson.RedissonKeys; import org.redisson.RedissonLock; import org.redisson.api.RKeys; import org.redisson.command.CommandAsyncExecutor; +import org.redisson.transaction.RedissonTransactionalLock; /** * @@ -28,14 +29,16 @@ import org.redisson.command.CommandAsyncExecutor; public class UnlinkOperation extends TransactionalOperation { private String lockName; + private String transactionId; public UnlinkOperation(String name) { - this(name, null, 0); + this(name, null, 0, null); } - public UnlinkOperation(String name, String lockName, long threadId) { + public UnlinkOperation(String name, String lockName, long threadId, String transactionId) { super(name, null, threadId); this.lockName = lockName; + this.transactionId = transactionId; } @Override @@ -43,7 +46,7 @@ public class UnlinkOperation extends TransactionalOperation { RKeys keys = new RedissonKeys(commandExecutor); keys.unlinkAsync(getName()); if (lockName != null) { - RedissonLock lock = new RedissonLock(commandExecutor, lockName); + RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); lock.unlockAsync(getThreadId()); } } @@ -51,7 +54,7 @@ public class UnlinkOperation extends TransactionalOperation { @Override public void rollback(CommandAsyncExecutor commandExecutor) { if (lockName != null) { - RedissonLock lock = new RedissonLock(commandExecutor, lockName); + RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); lock.unlockAsync(getThreadId()); } } diff --git a/redisson/src/test/java/org/redisson/transaction/RedissonTransactionalBucketTest.java b/redisson/src/test/java/org/redisson/transaction/RedissonTransactionalBucketTest.java index 75b67ee79..a34bf58ba 100644 --- a/redisson/src/test/java/org/redisson/transaction/RedissonTransactionalBucketTest.java +++ b/redisson/src/test/java/org/redisson/transaction/RedissonTransactionalBucketTest.java @@ -2,6 +2,7 @@ package org.redisson.transaction; import static org.assertj.core.api.Assertions.assertThat; +import java.time.Duration; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Assertions; @@ -51,7 +52,32 @@ public class RedissonTransactionalBucketTest extends BaseTest { assertThat(redisson.getKeys().count()).isEqualTo(1); assertThat(b.get()).isEqualTo("234"); } - + + @Test + public void testExpire() throws InterruptedException { + RBucket b = redisson.getBucket("test"); + b.set("123"); + + RTransaction transaction = redisson.createTransaction(TransactionOptions.defaults()); + RBucket bucket = transaction.getBucket("test"); + assertThat(bucket.clearExpire()).isFalse(); + assertThat(bucket.expire(Duration.ofSeconds(2))).isTrue(); + assertThat(bucket.clearExpire()).isTrue(); + transaction.commit(); + + Thread.sleep(2200); + + assertThat(b.get()).isEqualTo("123"); + + RTransaction transaction2 = redisson.createTransaction(TransactionOptions.defaults()); + RBucket bucket2 = transaction2.getBucket("test"); + assertThat(bucket2.expire(Duration.ofSeconds(1))).isTrue(); + transaction2.commit(); + + Thread.sleep(1100); + assertThat(b.get()).isNull(); + } + @Test public void testGetAndSet() { RBucket b = redisson.getBucket("test");