Feature - RedissonTransactionalBucket supports expire(), expireAt() and clearExpire() methods. #4249

pull/4385/head
Nikita Koksharov 3 years ago
parent 6fca7e9a3f
commit 3ba3d543ea

@ -22,10 +22,7 @@ import org.redisson.api.RLock;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.transaction.operation.DeleteOperation;
import org.redisson.transaction.operation.TouchOperation;
import org.redisson.transaction.operation.TransactionalOperation;
import org.redisson.transaction.operation.UnlinkOperation;
import org.redisson.transaction.operation.*;
import org.redisson.transaction.operation.bucket.*;
import java.util.List;
@ -50,6 +47,7 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
private final AtomicBoolean executed;
private final List<TransactionalOperation> operations;
private Object state;
private boolean hasExpiration;
private final String transactionId;
public RedissonTransactionalBucket(CommandAsyncExecutor commandExecutor, long timeout, String name, List<TransactionalOperation> operations, AtomicBoolean executed, String transactionId) {
@ -69,18 +67,60 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
}
@Override
public RFuture<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit, String param, String... keys) {
throw new UnsupportedOperationException("expire method is not supported in transaction");
protected RFuture<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit, String param, String... keys) {
checkState();
long currentThreadId = Thread.currentThread().getId();
return executeLocked(() -> {
if (state != null) {
operations.add(new ExpireOperation(getRawName(), getLockName(), currentThreadId, transactionId, timeToLive, timeUnit, param, keys));
hasExpiration = true;
return CompletableFuture.completedFuture(state != NULL);
}
return isExistsAsync().thenApply(res -> {
operations.add(new ExpireOperation(getRawName(), getLockName(), currentThreadId, transactionId, timeToLive, timeUnit, param, keys));
hasExpiration = res;
return res;
});
});
}
@Override
protected RFuture<Boolean> expireAtAsync(long timestamp, String param, String... keys) {
throw new UnsupportedOperationException("expire method is not supported in transaction");
checkState();
long currentThreadId = Thread.currentThread().getId();
return executeLocked(() -> {
if (state != null) {
operations.add(new ExpireAtOperation(getRawName(), getLockName(), currentThreadId, transactionId, timestamp, param, keys));
hasExpiration = true;
return CompletableFuture.completedFuture(state != NULL);
}
return isExistsAsync().thenApply(res -> {
operations.add(new ExpireAtOperation(getRawName(), getLockName(), currentThreadId, transactionId, timestamp, param, keys));
hasExpiration = res;
return res;
});
});
}
@Override
public RFuture<Boolean> clearExpireAsync() {
throw new UnsupportedOperationException("clearExpire method is not supported in transaction");
checkState();
long currentThreadId = Thread.currentThread().getId();
return executeLocked(() -> {
if (hasExpiration) {
operations.add(new ClearExpireOperation(getRawName(), getLockName(), currentThreadId, transactionId));
hasExpiration = false;
return CompletableFuture.completedFuture(true);
}
return remainTimeToLiveAsync().thenApply(res -> {
operations.add(new ClearExpireOperation(getRawName(), getLockName(), currentThreadId, transactionId));
hasExpiration = false;
return res > 0;
});
});
}
@Override
@ -126,12 +166,12 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
long currentThreadId = Thread.currentThread().getId();
return executeLocked(() -> {
if (state != null) {
operations.add(new TouchOperation(getRawName(), getLockName(), currentThreadId));
operations.add(new TouchOperation(getRawName(), getLockName(), currentThreadId, transactionId));
return CompletableFuture.completedFuture(state != NULL);
}
return isExistsAsync().thenApply(res -> {
operations.add(new TouchOperation(getRawName(), getLockName(), currentThreadId));
operations.add(new TouchOperation(getRawName(), getLockName(), currentThreadId, transactionId));
return res;
});
});
@ -143,7 +183,7 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
long currentThreadId = Thread.currentThread().getId();
return executeLocked(() -> {
if (state != null) {
operations.add(new UnlinkOperation(getRawName(), getLockName(), currentThreadId));
operations.add(new UnlinkOperation(getRawName(), getLockName(), currentThreadId, transactionId));
if (state == NULL) {
return CompletableFuture.completedFuture(false);
} else {
@ -153,7 +193,7 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
}
return isExistsAsync().thenApply(res -> {
operations.add(new UnlinkOperation(getRawName(), getLockName(), currentThreadId));
operations.add(new UnlinkOperation(getRawName(), getLockName(), currentThreadId, transactionId));
state = NULL;
return res;
});
@ -281,9 +321,10 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
return setAsync(newValue, new BucketSetOperation<V>(getRawName(), getLockName(), getCodec(), newValue, transactionId, currentThreadId));
}
private RFuture<Void> setAsync(V newValue, TransactionalOperation operation) {
private RFuture<Void> setAsync(V newValue, BucketSetOperation operation) {
checkState();
return executeLocked(() -> {
hasExpiration = operation.getTimeUnit() != null;
operations.add(operation);
state = Optional.ofNullable((Object) newValue).orElse(NULL);
return CompletableFuture.completedFuture(null);
@ -308,13 +349,14 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
return trySet(value, new BucketTrySetOperation<V>(getRawName(), getLockName(), getCodec(), value, timeToLive, timeUnit, transactionId, currentThreadId));
}
private RFuture<Boolean> trySet(V newValue, TransactionalOperation operation) {
private RFuture<Boolean> trySet(V newValue, BucketTrySetOperation operation) {
checkState();
return executeLocked(() -> {
if (state != null) {
operations.add(operation);
if (state == NULL) {
state = Optional.ofNullable((Object) newValue).orElse(NULL);
hasExpiration = operation.getTimeUnit() != null;
return CompletableFuture.completedFuture(true);
} else {
return CompletableFuture.completedFuture(false);
@ -324,6 +366,7 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
return getAsync().thenApply(res -> {
operations.add(operation);
if (res == null) {
hasExpiration = operation.getTimeUnit() != null;
state = Optional.ofNullable((Object) newValue).orElse(NULL);
return true;
}

@ -0,0 +1,66 @@
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.transaction.operation;
import org.redisson.RedissonKeys;
import org.redisson.RedissonLock;
import org.redisson.api.RKeys;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.RedissonTransactionalLock;
/**
*
* @author Nikita Koksharov
*
*/
public class ClearExpireOperation extends TransactionalOperation {
private String lockName;
private String transactionId;
public ClearExpireOperation(String name) {
this(name, null, 0, null);
}
public ClearExpireOperation(String name, String lockName, long threadId, String transactionId) {
super(name, null, threadId);
this.lockName = lockName;
this.transactionId = transactionId;
}
@Override
public void commit(CommandAsyncExecutor commandExecutor) {
RKeys keys = new RedissonKeys(commandExecutor);
keys.clearExpireAsync(getName());
if (lockName != null) {
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync(getThreadId());
}
}
@Override
public void rollback(CommandAsyncExecutor commandExecutor) {
if (lockName != null) {
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync(getThreadId());
}
}
public String getLockName() {
return lockName;
}
}

@ -0,0 +1,86 @@
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.transaction.operation;
import org.redisson.RedissonBucket;
import org.redisson.RedissonLock;
import org.redisson.api.RFuture;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.RedissonTransactionalLock;
/**
*
* @author Nikita Koksharov
*
*/
public class ExpireAtOperation extends TransactionalOperation {
public static final class RedissonBucketExtended extends RedissonBucket {
public RedissonBucketExtended(CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name);
}
@Override
protected RFuture<Boolean> expireAtAsync(long timestamp, String param, String... keys) {
return super.expireAtAsync(timestamp, param, keys);
}
}
private String lockName;
private String transactionId;
private long timestamp;
private String param;
private String[] keys;
public ExpireAtOperation(String name) {
this(name, null, 0, null, 0, null, (String[]) null);
}
public ExpireAtOperation(String name, String lockName, long threadId, String transactionId, long timestamp, String param, String... keys) {
super(name, null, threadId);
this.lockName = lockName;
this.transactionId = transactionId;
this.timestamp = timestamp;
this.param = param;
this.keys = keys;
}
@Override
public void commit(CommandAsyncExecutor commandExecutor) {
RedissonBucketExtended bucket = new RedissonBucketExtended(commandExecutor, name);
bucket.expireAtAsync(timestamp, param, keys);
if (lockName != null) {
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync(getThreadId());
}
}
@Override
public void rollback(CommandAsyncExecutor commandExecutor) {
if (lockName != null) {
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync(getThreadId());
}
}
public String getLockName() {
return lockName;
}
}

@ -0,0 +1,89 @@
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.transaction.operation;
import org.redisson.RedissonBucket;
import org.redisson.RedissonLock;
import org.redisson.api.RFuture;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.RedissonTransactionalLock;
import java.util.concurrent.TimeUnit;
/**
*
* @author Nikita Koksharov
*
*/
public class ExpireOperation extends TransactionalOperation {
public static final class RedissonBucketExtended extends RedissonBucket {
public RedissonBucketExtended(CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name);
}
@Override
public RFuture<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit, String param, String... keys) {
return super.expireAsync(timeToLive, timeUnit, param, keys);
}
}
private String lockName;
private String transactionId;
private long timeToLive;
private TimeUnit timeUnit;
private String param;
private String[] keys;
public ExpireOperation(String name) {
this(name, null, 0, null, 0, null, null, (String[]) null);
}
public ExpireOperation(String name, String lockName, long threadId, String transactionId, long timeToLive, TimeUnit timeUnit, String param, String... keys) {
super(name, null, threadId);
this.lockName = lockName;
this.transactionId = transactionId;
this.timeToLive = timeToLive;
this.timeUnit = timeUnit;
this.param = param;
this.keys = keys;
}
@Override
public void commit(CommandAsyncExecutor commandExecutor) {
RedissonBucketExtended bucket = new RedissonBucketExtended(commandExecutor, name);
bucket.expireAsync(timeToLive, timeUnit, param, keys);
if (lockName != null) {
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync(getThreadId());
}
}
@Override
public void rollback(CommandAsyncExecutor commandExecutor) {
if (lockName != null) {
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync(getThreadId());
}
}
public String getLockName() {
return lockName;
}
}

@ -19,6 +19,7 @@ import org.redisson.RedissonKeys;
import org.redisson.RedissonLock;
import org.redisson.api.RKeys;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.RedissonTransactionalLock;
/**
*
@ -28,28 +29,34 @@ import org.redisson.command.CommandAsyncExecutor;
public class TouchOperation extends TransactionalOperation {
private String lockName;
private String transactionId;
public TouchOperation(String name) {
this(name, null, 0);
this(name, null, 0, null);
}
public TouchOperation(String name, String lockName, long threadId) {
public TouchOperation(String name, String lockName, long threadId, String transactionId) {
super(name, null, threadId);
this.lockName = lockName;
this.transactionId = transactionId;
}
@Override
public void commit(CommandAsyncExecutor commandExecutor) {
RKeys keys = new RedissonKeys(commandExecutor);
keys.touchAsync(getName());
RedissonLock lock = new RedissonLock(commandExecutor, lockName);
lock.unlockAsync(getThreadId());
if (lockName != null) {
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync(getThreadId());
}
}
@Override
public void rollback(CommandAsyncExecutor commandExecutor) {
RedissonLock lock = new RedissonLock(commandExecutor, lockName);
lock.unlockAsync(getThreadId());
if (lockName != null) {
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync(getThreadId());
}
}
public String getLockName() {

@ -19,6 +19,7 @@ import org.redisson.RedissonKeys;
import org.redisson.RedissonLock;
import org.redisson.api.RKeys;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.RedissonTransactionalLock;
/**
*
@ -28,14 +29,16 @@ import org.redisson.command.CommandAsyncExecutor;
public class UnlinkOperation extends TransactionalOperation {
private String lockName;
private String transactionId;
public UnlinkOperation(String name) {
this(name, null, 0);
this(name, null, 0, null);
}
public UnlinkOperation(String name, String lockName, long threadId) {
public UnlinkOperation(String name, String lockName, long threadId, String transactionId) {
super(name, null, threadId);
this.lockName = lockName;
this.transactionId = transactionId;
}
@Override
@ -43,7 +46,7 @@ public class UnlinkOperation extends TransactionalOperation {
RKeys keys = new RedissonKeys(commandExecutor);
keys.unlinkAsync(getName());
if (lockName != null) {
RedissonLock lock = new RedissonLock(commandExecutor, lockName);
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync(getThreadId());
}
}
@ -51,7 +54,7 @@ public class UnlinkOperation extends TransactionalOperation {
@Override
public void rollback(CommandAsyncExecutor commandExecutor) {
if (lockName != null) {
RedissonLock lock = new RedissonLock(commandExecutor, lockName);
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync(getThreadId());
}
}

@ -2,6 +2,7 @@ package org.redisson.transaction;
import static org.assertj.core.api.Assertions.assertThat;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Assertions;
@ -51,7 +52,32 @@ public class RedissonTransactionalBucketTest extends BaseTest {
assertThat(redisson.getKeys().count()).isEqualTo(1);
assertThat(b.get()).isEqualTo("234");
}
@Test
public void testExpire() throws InterruptedException {
RBucket<String> b = redisson.getBucket("test");
b.set("123");
RTransaction transaction = redisson.createTransaction(TransactionOptions.defaults());
RBucket<String> bucket = transaction.getBucket("test");
assertThat(bucket.clearExpire()).isFalse();
assertThat(bucket.expire(Duration.ofSeconds(2))).isTrue();
assertThat(bucket.clearExpire()).isTrue();
transaction.commit();
Thread.sleep(2200);
assertThat(b.get()).isEqualTo("123");
RTransaction transaction2 = redisson.createTransaction(TransactionOptions.defaults());
RBucket<String> bucket2 = transaction2.getBucket("test");
assertThat(bucket2.expire(Duration.ofSeconds(1))).isTrue();
transaction2.commit();
Thread.sleep(1100);
assertThat(b.get()).isNull();
}
@Test
public void testGetAndSet() {
RBucket<String> b = redisson.getBucket("test");

Loading…
Cancel
Save