Fixed - Reactive Transactions aren't unlocking transactional locks #3697

pull/3715/head
Nikita Koksharov 4 years ago
parent e141872c02
commit f7fbb9fbf7

@ -119,7 +119,7 @@ public class BaseTransactionalMap<K, V> {
}
public RFuture<Boolean> unlinkAsync(CommandAsyncExecutor commandExecutor) {
return deleteAsync(commandExecutor, new UnlinkOperation(map.getName(), null));
return deleteAsync(commandExecutor, new UnlinkOperation(map.getName()));
}
public RFuture<Boolean> touchAsync(CommandAsyncExecutor commandExecutor) {

@ -137,11 +137,12 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
public RFuture<Boolean> touchAsync() {
checkState();
RPromise<Boolean> result = new RedissonPromise<Boolean>();
long currentThreadId = Thread.currentThread().getId();
executeLocked(result, new Runnable() {
@Override
public void run() {
if (state != null) {
operations.add(new TouchOperation(getRawName(), getLockName()));
operations.add(new TouchOperation(getRawName(), getLockName(), currentThreadId));
result.trySuccess(state != NULL);
return;
}
@ -152,7 +153,7 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
return;
}
operations.add(new TouchOperation(getRawName(), getLockName()));
operations.add(new TouchOperation(getRawName(), getLockName(), currentThreadId));
result.trySuccess(res);
});
}
@ -164,11 +165,12 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
public RFuture<Boolean> unlinkAsync() {
checkState();
RPromise<Boolean> result = new RedissonPromise<Boolean>();
long currentThreadId = Thread.currentThread().getId();
executeLocked(result, new Runnable() {
@Override
public void run() {
if (state != null) {
operations.add(new UnlinkOperation(getRawName(), getLockName()));
operations.add(new UnlinkOperation(getRawName(), getLockName(), currentThreadId));
if (state == NULL) {
result.trySuccess(false);
} else {
@ -184,7 +186,7 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
return;
}
operations.add(new UnlinkOperation(getRawName(), getLockName()));
operations.add(new UnlinkOperation(getRawName(), getLockName(), currentThreadId));
state = NULL;
result.trySuccess(res);
});
@ -246,11 +248,12 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
public RFuture<Boolean> compareAndSetAsync(V expect, V update) {
checkState();
RPromise<Boolean> result = new RedissonPromise<>();
long currentThreadId = Thread.currentThread().getId();
executeLocked(result, new Runnable() {
@Override
public void run() {
if (state != null) {
operations.add(new BucketCompareAndSetOperation<V>(getRawName(), getLockName(), getCodec(), expect, update, transactionId));
operations.add(new BucketCompareAndSetOperation<V>(getRawName(), getLockName(), getCodec(), expect, update, transactionId, currentThreadId));
if ((state == NULL && expect == null)
|| isEquals(state, expect)) {
if (update == null) {
@ -271,7 +274,7 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
return;
}
operations.add(new BucketCompareAndSetOperation<V>(getRawName(), getLockName(), getCodec(), expect, update, transactionId));
operations.add(new BucketCompareAndSetOperation<V>(getRawName(), getLockName(), getCodec(), expect, update, transactionId, currentThreadId));
if ((res == null && expect == null)
|| isEquals(res, expect)) {
if (update == null) {
@ -347,6 +350,7 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
public RFuture<V> getAndDeleteAsync() {
checkState();
RPromise<V> result = new RedissonPromise<V>();
long currentThreadId = Thread.currentThread().getId();
executeLocked(result, new Runnable() {
@Override
public void run() {
@ -357,7 +361,7 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
} else {
prevValue = state;
}
operations.add(new BucketGetAndDeleteOperation<V>(getRawName(), getLockName(), getCodec(), transactionId));
operations.add(new BucketGetAndDeleteOperation<V>(getRawName(), getLockName(), getCodec(), transactionId, currentThreadId));
state = NULL;
result.trySuccess((V) prevValue);
return;
@ -370,7 +374,7 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
}
state = NULL;
operations.add(new BucketGetAndDeleteOperation<V>(getRawName(), getLockName(), getCodec(), transactionId));
operations.add(new BucketGetAndDeleteOperation<V>(getRawName(), getLockName(), getCodec(), transactionId, currentThreadId));
result.trySuccess(res);
});
}
@ -380,7 +384,8 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
@Override
public RFuture<Void> setAsync(V newValue) {
return setAsync(newValue, new BucketSetOperation<V>(getRawName(), getLockName(), getCodec(), newValue, transactionId));
long currentThreadId = Thread.currentThread().getId();
return setAsync(newValue, new BucketSetOperation<V>(getRawName(), getLockName(), getCodec(), newValue, transactionId, currentThreadId));
}
private RFuture<Void> setAsync(V newValue, TransactionalOperation operation) {
@ -403,17 +408,20 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
@Override
public RFuture<Void> setAsync(V value, long timeToLive, TimeUnit timeUnit) {
return setAsync(value, new BucketSetOperation<V>(getRawName(), getLockName(), getCodec(), value, timeToLive, timeUnit, transactionId));
long currentThreadId = Thread.currentThread().getId();
return setAsync(value, new BucketSetOperation<V>(getRawName(), getLockName(), getCodec(), value, timeToLive, timeUnit, transactionId, currentThreadId));
}
@Override
public RFuture<Boolean> trySetAsync(V newValue) {
return trySet(newValue, new BucketTrySetOperation<V>(getRawName(), getLockName(), getCodec(), newValue, transactionId));
long currentThreadId = Thread.currentThread().getId();
return trySet(newValue, new BucketTrySetOperation<V>(getRawName(), getLockName(), getCodec(), newValue, transactionId, currentThreadId));
}
@Override
public RFuture<Boolean> trySetAsync(V value, long timeToLive, TimeUnit timeUnit) {
return trySet(value, new BucketTrySetOperation<V>(getRawName(), getLockName(), getCodec(), value, timeToLive, timeUnit, transactionId));
long currentThreadId = Thread.currentThread().getId();
return trySet(value, new BucketTrySetOperation<V>(getRawName(), getLockName(), getCodec(), value, timeToLive, timeUnit, transactionId, currentThreadId));
}
private RFuture<Boolean> trySet(V newValue, TransactionalOperation operation) {
@ -460,7 +468,6 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
return result;
}
private boolean isEquals(Object value, Object oldValue) {
ByteBuf valueBuf = encode(value);
ByteBuf oldValueBuf = encode(oldValue);

@ -112,9 +112,10 @@ public class RedissonTransactionalBuckets extends RedissonBuckets {
checkState();
RPromise<Void> result = new RedissonPromise<>();
long currentThreadId = Thread.currentThread().getId();
executeLocked(result, () -> {
for (Entry<String, ?> entry : buckets.entrySet()) {
operations.add(new BucketSetOperation<>(entry.getKey(), getLockName(entry.getKey()), codec, entry.getValue(), transactionId));
operations.add(new BucketSetOperation<>(entry.getKey(), getLockName(entry.getKey()), codec, entry.getValue(), transactionId, currentThreadId));
if (entry.getValue() == null) {
state.put(entry.getKey(), NULL);
} else {

@ -30,11 +30,11 @@ public class TouchOperation extends TransactionalOperation {
private String lockName;
public TouchOperation(String name) {
this(name, null);
this(name, null, 0);
}
public TouchOperation(String name, String lockName) {
super(name, null);
public TouchOperation(String name, String lockName, long threadId) {
super(name, null, threadId);
this.lockName = lockName;
}
@ -43,13 +43,13 @@ public class TouchOperation extends TransactionalOperation {
RKeys keys = new RedissonKeys(commandExecutor);
keys.touchAsync(getName());
RedissonLock lock = new RedissonLock(commandExecutor, lockName);
lock.unlockAsync();
lock.unlockAsync(getThreadId());
}
@Override
public void rollback(CommandAsyncExecutor commandExecutor) {
RedissonLock lock = new RedissonLock(commandExecutor, lockName);
lock.unlockAsync();
lock.unlockAsync(getThreadId());
}
public String getLockName() {

@ -27,6 +27,7 @@ public abstract class TransactionalOperation {
protected Codec codec;
protected String name;
protected long threadId;
public TransactionalOperation() {
}
@ -35,7 +36,17 @@ public abstract class TransactionalOperation {
this.name = name;
this.codec = codec;
}
public TransactionalOperation(String name, Codec codec, long threadId) {
this.name = name;
this.codec = codec;
this.threadId = threadId;
}
public long getThreadId() {
return threadId;
}
public Codec getCodec() {
return codec;
}

@ -30,11 +30,11 @@ public class UnlinkOperation extends TransactionalOperation {
private String lockName;
public UnlinkOperation(String name) {
this(name, null);
this(name, null, 0);
}
public UnlinkOperation(String name, String lockName) {
super(name, null);
public UnlinkOperation(String name, String lockName, long threadId) {
super(name, null, threadId);
this.lockName = lockName;
}
@ -44,7 +44,7 @@ public class UnlinkOperation extends TransactionalOperation {
keys.unlinkAsync(getName());
if (lockName != null) {
RedissonLock lock = new RedissonLock(commandExecutor, lockName);
lock.unlockAsync();
lock.unlockAsync(getThreadId());
}
}
@ -52,7 +52,7 @@ public class UnlinkOperation extends TransactionalOperation {
public void rollback(CommandAsyncExecutor commandExecutor) {
if (lockName != null) {
RedissonLock lock = new RedissonLock(commandExecutor, lockName);
lock.unlockAsync();
lock.unlockAsync(getThreadId());
}
}

@ -35,8 +35,8 @@ public class BucketCompareAndSetOperation<V> extends TransactionalOperation {
private String lockName;
private String transactionId;
public BucketCompareAndSetOperation(String name, String lockName, Codec codec, V expected, V value, String transactionId) {
super(name, codec);
public BucketCompareAndSetOperation(String name, String lockName, Codec codec, V expected, V value, String transactionId, long threadId) {
super(name, codec, threadId);
this.expected = expected;
this.value = value;
this.lockName = lockName;
@ -48,13 +48,13 @@ public class BucketCompareAndSetOperation<V> extends TransactionalOperation {
RedissonBucket<V> bucket = new RedissonBucket<V>(codec, commandExecutor, name);
bucket.compareAndSetAsync(expected, value);
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync();
lock.unlockAsync(getThreadId());
}
@Override
public void rollback(CommandAsyncExecutor commandExecutor) {
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync();
lock.unlockAsync(getThreadId());
}
public V getExpected() {

@ -33,8 +33,8 @@ public class BucketGetAndDeleteOperation<V> extends TransactionalOperation {
private String lockName;
private String transactionId;
public BucketGetAndDeleteOperation(String name, String lockName, Codec codec, String transactionId) {
super(name, codec);
public BucketGetAndDeleteOperation(String name, String lockName, Codec codec, String transactionId, long threadId) {
super(name, codec, threadId);
this.lockName = lockName;
this.transactionId = transactionId;
}
@ -44,13 +44,13 @@ public class BucketGetAndDeleteOperation<V> extends TransactionalOperation {
RedissonBucket<V> bucket = new RedissonBucket<V>(codec, commandExecutor, name);
bucket.getAndDeleteAsync();
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync();
lock.unlockAsync(getThreadId());
}
@Override
public void rollback(CommandAsyncExecutor commandExecutor) {
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync();
lock.unlockAsync(getThreadId());
}
public String getLockName() {

@ -46,7 +46,7 @@ public class BucketGetAndSetOperation<V> extends TransactionalOperation {
}
public BucketGetAndSetOperation(String name, String lockName, Codec codec, Object value, String transactionId) {
super(name, codec);
super(name, codec, Thread.currentThread().getId());
this.value = value;
this.lockName = lockName;
this.transactionId = transactionId;
@ -61,13 +61,13 @@ public class BucketGetAndSetOperation<V> extends TransactionalOperation {
bucket.getAndSetAsync((V) value);
}
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync();
lock.unlockAsync(getThreadId());
}
@Override
public void rollback(CommandAsyncExecutor commandExecutor) {
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync();
lock.unlockAsync(getThreadId());
}
public Object getValue() {

@ -38,14 +38,14 @@ public class BucketSetOperation<V> extends TransactionalOperation {
private TimeUnit timeUnit;
private String transactionId;
public BucketSetOperation(String name, String lockName, Codec codec, Object value, long timeToLive, TimeUnit timeUnit, String transactionId) {
this(name, lockName, codec, value, transactionId);
public BucketSetOperation(String name, String lockName, Codec codec, Object value, long timeToLive, TimeUnit timeUnit, String transactionId, long threadId) {
this(name, lockName, codec, value, transactionId, threadId);
this.timeToLive = timeToLive;
this.timeUnit = timeUnit;
}
public BucketSetOperation(String name, String lockName, Codec codec, Object value, String transactionId) {
super(name, codec);
public BucketSetOperation(String name, String lockName, Codec codec, Object value, String transactionId, long threadId) {
super(name, codec, threadId);
this.value = value;
this.lockName = lockName;
this.transactionId = transactionId;
@ -60,13 +60,13 @@ public class BucketSetOperation<V> extends TransactionalOperation {
bucket.setAsync((V) value);
}
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync();
lock.unlockAsync(getThreadId());
}
@Override
public void rollback(CommandAsyncExecutor commandExecutor) {
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync();
lock.unlockAsync(getThreadId());
}
public Object getValue() {

@ -38,14 +38,14 @@ public class BucketTrySetOperation<V> extends TransactionalOperation {
private long timeToLive;
private TimeUnit timeUnit;
public BucketTrySetOperation(String name, String lockName, Codec codec, Object value, long timeToLive, TimeUnit timeUnit, String transactionId) {
this(name, lockName, codec, value, transactionId);
public BucketTrySetOperation(String name, String lockName, Codec codec, Object value, long timeToLive, TimeUnit timeUnit, String transactionId, long threadId) {
this(name, lockName, codec, value, transactionId, threadId);
this.timeToLive = timeToLive;
this.timeUnit = timeUnit;
}
public BucketTrySetOperation(String name, String lockName, Codec codec, Object value, String transactionId) {
super(name, codec);
public BucketTrySetOperation(String name, String lockName, Codec codec, Object value, String transactionId, long threadId) {
super(name, codec, threadId);
this.value = value;
this.lockName = lockName;
this.transactionId = transactionId;
@ -60,13 +60,13 @@ public class BucketTrySetOperation<V> extends TransactionalOperation {
bucket.trySetAsync((V) value);
}
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync();
lock.unlockAsync(getThreadId());
}
@Override
public void rollback(CommandAsyncExecutor commandExecutor) {
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync();
lock.unlockAsync(getThreadId());
}
public Object getValue() {

@ -10,9 +10,29 @@ import org.redisson.BaseReactiveTest;
import org.redisson.api.RBucketReactive;
import org.redisson.api.RTransactionReactive;
import org.redisson.api.TransactionOptions;
import org.redisson.client.codec.LongCodec;
public class RedissonTransactionalBucketReactiveTest extends BaseReactiveTest {
@Test
public void testLock() {
redisson.getBucket("e:1", LongCodec.INSTANCE).set(1L).block();
redisson.getBucket("e:2", LongCodec.INSTANCE).set(1L).block();
for (int j = 0; j < 10; j++) {
RTransactionReactive transaction = redisson.createTransaction(TransactionOptions.defaults().timeout(30, TimeUnit.SECONDS));
RBucketReactive<Long> e1 = transaction.getBucket("e:1", LongCodec.INSTANCE);
RBucketReactive<Long> e2 = transaction.getBucket("e:2", LongCodec.INSTANCE);
e1.get().map(i -> i + 1).flatMap(e1::set).block();
e2.get().map(i -> i - 1).flatMap(e2::set).block();
transaction.commit().block();
}
assertThat(redisson.getBucket("e:1", LongCodec.INSTANCE).get().block()).isEqualTo(11L);
assertThat(redisson.getBucket("e:2", LongCodec.INSTANCE).get().block()).isEqualTo(-9L);
}
@Test
public void testTimeout() throws InterruptedException {
RBucketReactive<String> b = redisson.getBucket("test");

Loading…
Cancel
Save