diff --git a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java index 59c15855e..7640df05f 100644 --- a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java +++ b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java @@ -119,7 +119,7 @@ public class BaseTransactionalMap { } public RFuture unlinkAsync(CommandAsyncExecutor commandExecutor) { - return deleteAsync(commandExecutor, new UnlinkOperation(map.getName(), null)); + return deleteAsync(commandExecutor, new UnlinkOperation(map.getName())); } public RFuture touchAsync(CommandAsyncExecutor commandExecutor) { diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBucket.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBucket.java index 5c15ad7e6..e432fa220 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBucket.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBucket.java @@ -137,11 +137,12 @@ public class RedissonTransactionalBucket extends RedissonBucket { public RFuture touchAsync() { checkState(); RPromise result = new RedissonPromise(); + long currentThreadId = Thread.currentThread().getId(); executeLocked(result, new Runnable() { @Override public void run() { if (state != null) { - operations.add(new TouchOperation(getRawName(), getLockName())); + operations.add(new TouchOperation(getRawName(), getLockName(), currentThreadId)); result.trySuccess(state != NULL); return; } @@ -152,7 +153,7 @@ public class RedissonTransactionalBucket extends RedissonBucket { return; } - operations.add(new TouchOperation(getRawName(), getLockName())); + operations.add(new TouchOperation(getRawName(), getLockName(), currentThreadId)); result.trySuccess(res); }); } @@ -164,11 +165,12 @@ public class RedissonTransactionalBucket extends RedissonBucket { public RFuture unlinkAsync() { checkState(); RPromise result = new RedissonPromise(); + long currentThreadId = Thread.currentThread().getId(); executeLocked(result, new Runnable() { @Override public void run() { if (state != null) { - operations.add(new UnlinkOperation(getRawName(), getLockName())); + operations.add(new UnlinkOperation(getRawName(), getLockName(), currentThreadId)); if (state == NULL) { result.trySuccess(false); } else { @@ -184,7 +186,7 @@ public class RedissonTransactionalBucket extends RedissonBucket { return; } - operations.add(new UnlinkOperation(getRawName(), getLockName())); + operations.add(new UnlinkOperation(getRawName(), getLockName(), currentThreadId)); state = NULL; result.trySuccess(res); }); @@ -246,11 +248,12 @@ public class RedissonTransactionalBucket extends RedissonBucket { public RFuture compareAndSetAsync(V expect, V update) { checkState(); RPromise result = new RedissonPromise<>(); + long currentThreadId = Thread.currentThread().getId(); executeLocked(result, new Runnable() { @Override public void run() { if (state != null) { - operations.add(new BucketCompareAndSetOperation(getRawName(), getLockName(), getCodec(), expect, update, transactionId)); + operations.add(new BucketCompareAndSetOperation(getRawName(), getLockName(), getCodec(), expect, update, transactionId, currentThreadId)); if ((state == NULL && expect == null) || isEquals(state, expect)) { if (update == null) { @@ -271,7 +274,7 @@ public class RedissonTransactionalBucket extends RedissonBucket { return; } - operations.add(new BucketCompareAndSetOperation(getRawName(), getLockName(), getCodec(), expect, update, transactionId)); + operations.add(new BucketCompareAndSetOperation(getRawName(), getLockName(), getCodec(), expect, update, transactionId, currentThreadId)); if ((res == null && expect == null) || isEquals(res, expect)) { if (update == null) { @@ -347,6 +350,7 @@ public class RedissonTransactionalBucket extends RedissonBucket { public RFuture getAndDeleteAsync() { checkState(); RPromise result = new RedissonPromise(); + long currentThreadId = Thread.currentThread().getId(); executeLocked(result, new Runnable() { @Override public void run() { @@ -357,7 +361,7 @@ public class RedissonTransactionalBucket extends RedissonBucket { } else { prevValue = state; } - operations.add(new BucketGetAndDeleteOperation(getRawName(), getLockName(), getCodec(), transactionId)); + operations.add(new BucketGetAndDeleteOperation(getRawName(), getLockName(), getCodec(), transactionId, currentThreadId)); state = NULL; result.trySuccess((V) prevValue); return; @@ -370,7 +374,7 @@ public class RedissonTransactionalBucket extends RedissonBucket { } state = NULL; - operations.add(new BucketGetAndDeleteOperation(getRawName(), getLockName(), getCodec(), transactionId)); + operations.add(new BucketGetAndDeleteOperation(getRawName(), getLockName(), getCodec(), transactionId, currentThreadId)); result.trySuccess(res); }); } @@ -380,7 +384,8 @@ public class RedissonTransactionalBucket extends RedissonBucket { @Override public RFuture setAsync(V newValue) { - return setAsync(newValue, new BucketSetOperation(getRawName(), getLockName(), getCodec(), newValue, transactionId)); + long currentThreadId = Thread.currentThread().getId(); + return setAsync(newValue, new BucketSetOperation(getRawName(), getLockName(), getCodec(), newValue, transactionId, currentThreadId)); } private RFuture setAsync(V newValue, TransactionalOperation operation) { @@ -403,17 +408,20 @@ public class RedissonTransactionalBucket extends RedissonBucket { @Override public RFuture setAsync(V value, long timeToLive, TimeUnit timeUnit) { - return setAsync(value, new BucketSetOperation(getRawName(), getLockName(), getCodec(), value, timeToLive, timeUnit, transactionId)); + long currentThreadId = Thread.currentThread().getId(); + return setAsync(value, new BucketSetOperation(getRawName(), getLockName(), getCodec(), value, timeToLive, timeUnit, transactionId, currentThreadId)); } @Override public RFuture trySetAsync(V newValue) { - return trySet(newValue, new BucketTrySetOperation(getRawName(), getLockName(), getCodec(), newValue, transactionId)); + long currentThreadId = Thread.currentThread().getId(); + return trySet(newValue, new BucketTrySetOperation(getRawName(), getLockName(), getCodec(), newValue, transactionId, currentThreadId)); } @Override public RFuture trySetAsync(V value, long timeToLive, TimeUnit timeUnit) { - return trySet(value, new BucketTrySetOperation(getRawName(), getLockName(), getCodec(), value, timeToLive, timeUnit, transactionId)); + long currentThreadId = Thread.currentThread().getId(); + return trySet(value, new BucketTrySetOperation(getRawName(), getLockName(), getCodec(), value, timeToLive, timeUnit, transactionId, currentThreadId)); } private RFuture trySet(V newValue, TransactionalOperation operation) { @@ -460,7 +468,6 @@ public class RedissonTransactionalBucket extends RedissonBucket { return result; } - private boolean isEquals(Object value, Object oldValue) { ByteBuf valueBuf = encode(value); ByteBuf oldValueBuf = encode(oldValue); diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBuckets.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBuckets.java index b51f7b9df..509769f27 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBuckets.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBuckets.java @@ -112,9 +112,10 @@ public class RedissonTransactionalBuckets extends RedissonBuckets { checkState(); RPromise result = new RedissonPromise<>(); + long currentThreadId = Thread.currentThread().getId(); executeLocked(result, () -> { for (Entry entry : buckets.entrySet()) { - operations.add(new BucketSetOperation<>(entry.getKey(), getLockName(entry.getKey()), codec, entry.getValue(), transactionId)); + operations.add(new BucketSetOperation<>(entry.getKey(), getLockName(entry.getKey()), codec, entry.getValue(), transactionId, currentThreadId)); if (entry.getValue() == null) { state.put(entry.getKey(), NULL); } else { diff --git a/redisson/src/main/java/org/redisson/transaction/operation/TouchOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/TouchOperation.java index 42366b467..9e0fecd1e 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/TouchOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/TouchOperation.java @@ -30,11 +30,11 @@ public class TouchOperation extends TransactionalOperation { private String lockName; public TouchOperation(String name) { - this(name, null); + this(name, null, 0); } - public TouchOperation(String name, String lockName) { - super(name, null); + public TouchOperation(String name, String lockName, long threadId) { + super(name, null, threadId); this.lockName = lockName; } @@ -43,13 +43,13 @@ public class TouchOperation extends TransactionalOperation { RKeys keys = new RedissonKeys(commandExecutor); keys.touchAsync(getName()); RedissonLock lock = new RedissonLock(commandExecutor, lockName); - lock.unlockAsync(); + lock.unlockAsync(getThreadId()); } @Override public void rollback(CommandAsyncExecutor commandExecutor) { RedissonLock lock = new RedissonLock(commandExecutor, lockName); - lock.unlockAsync(); + lock.unlockAsync(getThreadId()); } public String getLockName() { diff --git a/redisson/src/main/java/org/redisson/transaction/operation/TransactionalOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/TransactionalOperation.java index 1a8c77628..f612cca72 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/TransactionalOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/TransactionalOperation.java @@ -27,6 +27,7 @@ public abstract class TransactionalOperation { protected Codec codec; protected String name; + protected long threadId; public TransactionalOperation() { } @@ -35,7 +36,17 @@ public abstract class TransactionalOperation { this.name = name; this.codec = codec; } - + + public TransactionalOperation(String name, Codec codec, long threadId) { + this.name = name; + this.codec = codec; + this.threadId = threadId; + } + + public long getThreadId() { + return threadId; + } + public Codec getCodec() { return codec; } diff --git a/redisson/src/main/java/org/redisson/transaction/operation/UnlinkOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/UnlinkOperation.java index f64fdfc03..65da155fc 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/UnlinkOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/UnlinkOperation.java @@ -30,11 +30,11 @@ public class UnlinkOperation extends TransactionalOperation { private String lockName; public UnlinkOperation(String name) { - this(name, null); + this(name, null, 0); } - public UnlinkOperation(String name, String lockName) { - super(name, null); + public UnlinkOperation(String name, String lockName, long threadId) { + super(name, null, threadId); this.lockName = lockName; } @@ -44,7 +44,7 @@ public class UnlinkOperation extends TransactionalOperation { keys.unlinkAsync(getName()); if (lockName != null) { RedissonLock lock = new RedissonLock(commandExecutor, lockName); - lock.unlockAsync(); + lock.unlockAsync(getThreadId()); } } @@ -52,7 +52,7 @@ public class UnlinkOperation extends TransactionalOperation { public void rollback(CommandAsyncExecutor commandExecutor) { if (lockName != null) { RedissonLock lock = new RedissonLock(commandExecutor, lockName); - lock.unlockAsync(); + lock.unlockAsync(getThreadId()); } } diff --git a/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketCompareAndSetOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketCompareAndSetOperation.java index acd955b69..582f0ddfb 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketCompareAndSetOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketCompareAndSetOperation.java @@ -35,8 +35,8 @@ public class BucketCompareAndSetOperation extends TransactionalOperation { private String lockName; private String transactionId; - public BucketCompareAndSetOperation(String name, String lockName, Codec codec, V expected, V value, String transactionId) { - super(name, codec); + public BucketCompareAndSetOperation(String name, String lockName, Codec codec, V expected, V value, String transactionId, long threadId) { + super(name, codec, threadId); this.expected = expected; this.value = value; this.lockName = lockName; @@ -48,13 +48,13 @@ public class BucketCompareAndSetOperation extends TransactionalOperation { RedissonBucket bucket = new RedissonBucket(codec, commandExecutor, name); bucket.compareAndSetAsync(expected, value); RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); - lock.unlockAsync(); + lock.unlockAsync(getThreadId()); } @Override public void rollback(CommandAsyncExecutor commandExecutor) { RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); - lock.unlockAsync(); + lock.unlockAsync(getThreadId()); } public V getExpected() { diff --git a/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketGetAndDeleteOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketGetAndDeleteOperation.java index 18b1324c3..66060ea9a 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketGetAndDeleteOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketGetAndDeleteOperation.java @@ -33,8 +33,8 @@ public class BucketGetAndDeleteOperation extends TransactionalOperation { private String lockName; private String transactionId; - public BucketGetAndDeleteOperation(String name, String lockName, Codec codec, String transactionId) { - super(name, codec); + public BucketGetAndDeleteOperation(String name, String lockName, Codec codec, String transactionId, long threadId) { + super(name, codec, threadId); this.lockName = lockName; this.transactionId = transactionId; } @@ -44,13 +44,13 @@ public class BucketGetAndDeleteOperation extends TransactionalOperation { RedissonBucket bucket = new RedissonBucket(codec, commandExecutor, name); bucket.getAndDeleteAsync(); RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); - lock.unlockAsync(); + lock.unlockAsync(getThreadId()); } @Override public void rollback(CommandAsyncExecutor commandExecutor) { RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); - lock.unlockAsync(); + lock.unlockAsync(getThreadId()); } public String getLockName() { diff --git a/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketGetAndSetOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketGetAndSetOperation.java index 9ffc4cbd7..3410f435f 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketGetAndSetOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketGetAndSetOperation.java @@ -46,7 +46,7 @@ public class BucketGetAndSetOperation extends TransactionalOperation { } public BucketGetAndSetOperation(String name, String lockName, Codec codec, Object value, String transactionId) { - super(name, codec); + super(name, codec, Thread.currentThread().getId()); this.value = value; this.lockName = lockName; this.transactionId = transactionId; @@ -61,13 +61,13 @@ public class BucketGetAndSetOperation extends TransactionalOperation { bucket.getAndSetAsync((V) value); } RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); - lock.unlockAsync(); + lock.unlockAsync(getThreadId()); } @Override public void rollback(CommandAsyncExecutor commandExecutor) { RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); - lock.unlockAsync(); + lock.unlockAsync(getThreadId()); } public Object getValue() { diff --git a/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketSetOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketSetOperation.java index afa91768a..afd401068 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketSetOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketSetOperation.java @@ -38,14 +38,14 @@ public class BucketSetOperation extends TransactionalOperation { private TimeUnit timeUnit; private String transactionId; - public BucketSetOperation(String name, String lockName, Codec codec, Object value, long timeToLive, TimeUnit timeUnit, String transactionId) { - this(name, lockName, codec, value, transactionId); + public BucketSetOperation(String name, String lockName, Codec codec, Object value, long timeToLive, TimeUnit timeUnit, String transactionId, long threadId) { + this(name, lockName, codec, value, transactionId, threadId); this.timeToLive = timeToLive; this.timeUnit = timeUnit; } - public BucketSetOperation(String name, String lockName, Codec codec, Object value, String transactionId) { - super(name, codec); + public BucketSetOperation(String name, String lockName, Codec codec, Object value, String transactionId, long threadId) { + super(name, codec, threadId); this.value = value; this.lockName = lockName; this.transactionId = transactionId; @@ -60,13 +60,13 @@ public class BucketSetOperation extends TransactionalOperation { bucket.setAsync((V) value); } RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); - lock.unlockAsync(); + lock.unlockAsync(getThreadId()); } @Override public void rollback(CommandAsyncExecutor commandExecutor) { RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); - lock.unlockAsync(); + lock.unlockAsync(getThreadId()); } public Object getValue() { diff --git a/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketTrySetOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketTrySetOperation.java index 263de1344..4d58c8bb3 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketTrySetOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketTrySetOperation.java @@ -38,14 +38,14 @@ public class BucketTrySetOperation extends TransactionalOperation { private long timeToLive; private TimeUnit timeUnit; - public BucketTrySetOperation(String name, String lockName, Codec codec, Object value, long timeToLive, TimeUnit timeUnit, String transactionId) { - this(name, lockName, codec, value, transactionId); + public BucketTrySetOperation(String name, String lockName, Codec codec, Object value, long timeToLive, TimeUnit timeUnit, String transactionId, long threadId) { + this(name, lockName, codec, value, transactionId, threadId); this.timeToLive = timeToLive; this.timeUnit = timeUnit; } - public BucketTrySetOperation(String name, String lockName, Codec codec, Object value, String transactionId) { - super(name, codec); + public BucketTrySetOperation(String name, String lockName, Codec codec, Object value, String transactionId, long threadId) { + super(name, codec, threadId); this.value = value; this.lockName = lockName; this.transactionId = transactionId; @@ -60,13 +60,13 @@ public class BucketTrySetOperation extends TransactionalOperation { bucket.trySetAsync((V) value); } RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); - lock.unlockAsync(); + lock.unlockAsync(getThreadId()); } @Override public void rollback(CommandAsyncExecutor commandExecutor) { RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); - lock.unlockAsync(); + lock.unlockAsync(getThreadId()); } public Object getValue() { diff --git a/redisson/src/test/java/org/redisson/transaction/RedissonTransactionalBucketReactiveTest.java b/redisson/src/test/java/org/redisson/transaction/RedissonTransactionalBucketReactiveTest.java index 33f5abdb7..9771d8539 100644 --- a/redisson/src/test/java/org/redisson/transaction/RedissonTransactionalBucketReactiveTest.java +++ b/redisson/src/test/java/org/redisson/transaction/RedissonTransactionalBucketReactiveTest.java @@ -10,9 +10,29 @@ import org.redisson.BaseReactiveTest; import org.redisson.api.RBucketReactive; import org.redisson.api.RTransactionReactive; import org.redisson.api.TransactionOptions; +import org.redisson.client.codec.LongCodec; public class RedissonTransactionalBucketReactiveTest extends BaseReactiveTest { + @Test + public void testLock() { + redisson.getBucket("e:1", LongCodec.INSTANCE).set(1L).block(); + redisson.getBucket("e:2", LongCodec.INSTANCE).set(1L).block(); + + for (int j = 0; j < 10; j++) { + RTransactionReactive transaction = redisson.createTransaction(TransactionOptions.defaults().timeout(30, TimeUnit.SECONDS)); + RBucketReactive e1 = transaction.getBucket("e:1", LongCodec.INSTANCE); + RBucketReactive e2 = transaction.getBucket("e:2", LongCodec.INSTANCE); + e1.get().map(i -> i + 1).flatMap(e1::set).block(); + e2.get().map(i -> i - 1).flatMap(e2::set).block(); + + transaction.commit().block(); + } + + assertThat(redisson.getBucket("e:1", LongCodec.INSTANCE).get().block()).isEqualTo(11L); + assertThat(redisson.getBucket("e:2", LongCodec.INSTANCE).get().block()).isEqualTo(-9L); + } + @Test public void testTimeout() throws InterruptedException { RBucketReactive b = redisson.getBucket("test");